数据有缺少
来源:8-16 ForeachSink到MySQL

酱汁怪兽
2022-07-13
监控每天的实时日志,数据量会少2-50条数据,很少有对的。
这种方式其实并不能,百分之百确定数据的监控及传输吗?
因为每天的日志量在2万条左右,具体少了哪些也不会一一核对,只能核对数量。
离线Hadoop一直很稳定,数据量也是对的。
kafka,具体就不提供了。
flume:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
# Offset record path
a1.sources.r1.positionFile = /home/hadoop/tmp/position/taildir_position.json
# Time log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /www/wwwroot/hs.chnau99999.com/jar/shopLogs/.*log.*
# Header
a1.sources.r1.headers.f1.headerKey1 = shop10
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = shop-topic
a1.sinks.k1.kafka.bootstrap.servers = hadoop000:9092,hadoop000:9093,hadoop000:9094
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
SSS
package com.imooc.sss.project
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import java.sql.{Connection, DriverManager, Statement}
object StructruedShoplogsDateTime {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName(this.getClass.getName)
.master("local[2]")
.getOrCreate()
val lines = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.50.166:9092,192.168.50.166:9093,192.168.50.166:9094")
.option("subscribe", "shop-topic")
.option("startingOffsets", "latest")
.load()
lines.createOrReplaceTempView("tmp1")
val lines2 = spark.sql("select split(value,'\t') as a from tmp1")
lines2.createOrReplaceTempView("tmp2")
val result = spark.sql("select a[0] as start_time, " +
"a[1] as nickname, " +
"a[2] as distinct_id, " +
"a[3] as address_phone, " +
"a[4] as user_ip, " +
"a[5] as device_brand, " +
"a[6] as device_version, " +
"a[7] as view_path, " +
"a[8] as enter_type, " +
"a[9] as end_time from tmp2")
result.writeStream
.foreach(new MySqlWriter)
.start()
.awaitTermination()
}
class MySqlWriter extends ForeachWriter[Row] {
var connection: Connection = null;
var statement: Statement = null
override def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
connection = DriverManager.getConnection("jdbc:mysql://192.168.50.73:4000/bi_tracking", "root", "zj123456")
statement = connection.createStatement()
true
}
override def process(value: Row): Unit = {
val p = connection.prepareStatement("replace into o_wshop_original_time_s10_log (start_time, " +
"nickname, " +
"distinct_id, " +
"address_phone, " +
"user_ip, " +
"device_brand, " +
"device_version, " +
"view_path, " +
"enter_type, " +
"end_time) " +
"values (?,?,?,?,?,?,?,?,?,?)")
p.setString(1, value(0).toString)
p.setString(2, value(1).toString)
p.setString(3, value(2).toString)
p.setString(4, value(3).toString)
p.setString(5, value(4).toString)
p.setString(6, value(5).toString)
p.setString(7, value(6).toString)
p.setString(8, value(7).toString)
p.setString(9, value(8).toString)
p.setString(10, value(9).toString)
p.execute()
}
override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}
}
写回答
1回答
-
Michael_PK
2022-07-17
这个问题,看你描述的,我很难判断是哪个环节出了问题。
给个思路:
1)离线和实时进来的数据量确定一样吗?
2)实时处理中间过程是否出现过异常,特别是offset是否真的都正确提交了,这个代码偏移量是系统管理的
00
相似问题