数据有缺少

来源:8-16 ForeachSink到MySQL

酱汁怪兽

2022-07-13

监控每天的实时日志,数据量会少2-50条数据,很少有对的。
这种方式其实并不能,百分之百确定数据的监控及传输吗?
因为每天的日志量在2万条左右,具体少了哪些也不会一一核对,只能核对数量。
离线Hadoop一直很稳定,数据量也是对的。
图片描述
kafka,具体就不提供了。

flume:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
# Offset record path
a1.sources.r1.positionFile = /home/hadoop/tmp/position/taildir_position.json
# Time log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /www/wwwroot/hs.chnau99999.com/jar/shopLogs/.*log.*
# Header
a1.sources.r1.headers.f1.headerKey1 = shop10
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = shop-topic
a1.sinks.k1.kafka.bootstrap.servers = hadoop000:9092,hadoop000:9093,hadoop000:9094
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

SSS

package com.imooc.sss.project

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

import java.sql.{Connection, DriverManager, Statement}

object StructruedShoplogsDateTime {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .appName(this.getClass.getName)
      .master("local[2]")
      .getOrCreate()
    val lines = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.50.166:9092,192.168.50.166:9093,192.168.50.166:9094")
      .option("subscribe", "shop-topic")
      .option("startingOffsets", "latest")
      .load()

    lines.createOrReplaceTempView("tmp1")

    val lines2 = spark.sql("select split(value,'\t') as a from tmp1")

    lines2.createOrReplaceTempView("tmp2")

    val result = spark.sql("select a[0] as start_time, " +
      "a[1] as nickname, " +
      "a[2] as distinct_id, " +
      "a[3] as address_phone, " +
      "a[4] as user_ip, " +
      "a[5] as device_brand, " +
      "a[6] as device_version, " +
      "a[7] as view_path, " +
      "a[8] as enter_type, " +
      "a[9] as end_time from tmp2")

    result.writeStream
      .foreach(new MySqlWriter)
      .start()
      .awaitTermination()
  }

  class MySqlWriter extends ForeachWriter[Row] {
    var connection: Connection = null;
    var statement: Statement = null

    override def open(partitionId: Long, version: Long): Boolean = {
      Class.forName("com.mysql.jdbc.Driver")
      connection = DriverManager.getConnection("jdbc:mysql://192.168.50.73:4000/bi_tracking", "root", "zj123456")
      statement = connection.createStatement()
      true
    }

    override def process(value: Row): Unit = {
      val p = connection.prepareStatement("replace into o_wshop_original_time_s10_log (start_time, " +
        "nickname, " +
        "distinct_id, " +
        "address_phone, " +
        "user_ip, " +
        "device_brand, " +
        "device_version, " +
        "view_path, " +
        "enter_type, " +
        "end_time) " +
        "values (?,?,?,?,?,?,?,?,?,?)")
      p.setString(1, value(0).toString)
      p.setString(2, value(1).toString)
      p.setString(3, value(2).toString)
      p.setString(4, value(3).toString)
      p.setString(5, value(4).toString)
      p.setString(6, value(5).toString)
      p.setString(7, value(6).toString)
      p.setString(8, value(7).toString)
      p.setString(9, value(8).toString)
      p.setString(10, value(9).toString)
      p.execute()
    }

    override def close(errorOrNull: Throwable): Unit = {
      connection.close()
    }
  }

}

写回答

1回答

Michael_PK

2022-07-17

这个问题,看你描述的,我很难判断是哪个环节出了问题。

给个思路:

1)离线和实时进来的数据量确定一样吗?

2)实时处理中间过程是否出现过异常,特别是offset是否真的都正确提交了,这个代码偏移量是系统管理的

0
0

Spark3实时处理-Streaming+StructuredStreaming实战

实战Spark3实时处理,掌握两套企业级处理方案

340 学习 · 238 问题

查看课程