调用2次foreach方法分别保存到redis的2个key时,只有第2次保存成功
来源:9-9 调优及作业

qq_梦也_1
2020-10-29
pk老师您好,调用2次foreach方法,分别保存offsets和统计结果表到redis时,第1次foreach并没有将结果保存的到redis中,编译和运行时都不报错。是不是方法用的方式不正确?还是foreach不支持同时保存2张表的操作,必须用foreachbatch来实现?
代码如下:
object SSSOffsetsApp {
def main(args: Array[String]): Unit = {
Logger.getLogger(this.getClass.getSimpleName).setLevel(Level.ERROR)
val spark = SparkSession.builder()
.master(“local[4]”)
.appName(this.getClass.getName)
.config(“spark.sql.shuffle.partitions”,“10”)
.getOrCreate()
val topic = "access-topic-prod"
val groupid = ""
val startingOffsets: String = RedisUtils.getJsonOffsets(topic, groupid)
import spark.implicits._
val frame: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "MacBookAir.local:9092,MacBookAir.local:9093,MacBookAir.local:9094")
.option("subscribe", "access-topic-prod")
.option("startingOffsets", startingOffsets)
.load()
storeOffsets(frame, spark)
save2DB(frame, spark)
}
/**
- 保存offsets
- @param dataFrame
- @param spark
*/
def storeOffsets(dataFrame:DataFrame,spark:SparkSession){
import spark.implicits._
dataFrame.selectExpr(“topic”,“partition”,“offset”)//注释掉该行,获取包含offset的全部信息,根据已有offset,合理设置startingOffsets中的值。
.as[(String,Int,Long)]
.map(x => {
val topic = x._1
val partition = x._2
val offset = x._3
//topic,partition,offset
(topic,partition,offset)
})
.toDF(“topic”,“partition”,“offset”)
.writeStream
/*输出到redis/
.outputMode(OutputMode.Update())
.foreach(new RedisOffsetsForeachWriter())
.start()
.awaitTermination()
}
/**
- 保存统计结果
- @param dataFrame
- @param spark
*/
def save2DB(dataFrame:DataFrame,spark:SparkSession){
import spark.implicits._
dataFrame.selectExpr(“cast (value as String)”)
.as[String]
.map(x => {
val splits = x.split("\t")
val time = splits(0)
val ip = splits(2)
//eventTime,日期,省份
(DateUtils.parseToTimestamp(time),DateUtils.parseToDay(time),IPUtils.parseIP(ip))
})
.toDF(“ts”,“day”,“province”)
.withWatermark(“ts” ,“10 minutes”)
.groupBy(“day”,“province”)
.count()
.writeStream
/*输出到redis/
.outputMode(OutputMode.Update())
.foreach(new RedisDBForeachWriter())
.start()
.awaitTermination()
}
}
class RedisOffsetsForeachWriter extends ForeachWriter[Row] {
var client:Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
println(s"打开connection:partitionId,partitionId,partitionId,epochId")
client = RedisUtils.getJedisClient()
client != null
}
override def process(value: Row): Unit = {
//从Row中获取offsets,然后保存到redis中
val topic = value.getAsString
val partition = value.getAsInt
val offset = value.getAsLong
client.hset(topic,partition+"",offset+"")
println(s"topic:topic,partition:topic,partition:topic,partition:partition,offset:$offset")
}
override def close(errorOrNull: Throwable): Unit = {
if(null != client){
RedisUtils.returnResource(client)
}
}
}
class RedisDBForeachWriter extends ForeachWriter[Row] {
var client:Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
client = RedisUtils.getJedisClient()
client != null
}
override def process(value: Row): Unit = {
val day = value.getString(0)
val province = value.getString(1)
val cnts = value.getLong(2)
client.hset(“day-province-cnts-”+day, province,cnts+"")
println(s"day:day,province:day,province:day,province:province,cnts:$cnts")
}
override def close(errorOrNull: Throwable): Unit = {
if(null != client){
RedisUtils.returnResource(client)
}
}
}
1回答
-
Michael_PK
2020-10-29
第一次报什么错?Redis是覆盖的,第二次会覆盖前面的吧
082020-10-29
相似问题