云主机kafka对接SSS,有2个报错。

来源:8-10 对接Kafka数据源数据

酱汁怪兽

2022-04-18

描述:
已开启的服务:
dfs、yarn、zookeeper、flume、kafka、consumer、master

[root@hadoop000 logtest]# jps -m
512 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-local-test3.properties
32577 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-local-test2.properties
30308 SecondaryNameNode
30151 DataNode
616 ConsoleConsumer --bootstrap-server hadoop000:9092,hadoop000:9093,hadoop000:9094 --topic localtest-replicated-topic
1001 Application --conf-file /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config/flume-kafka-logger.conf --name a1
31724 QuorumPeerMain /home/hadoop/app/zookeeper-3.4.5-cdh5.16.2/bin/../conf/zoo.cfg
30029 NameNode
32175 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-local-test1.properties
5206 Jps -m
30779 NodeManager
30492 ResourceManager
1247 Master --host hadoop000 --port 7077 --webui-port 8080

本地sss程序对接kafka的consumer
报错一:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

报错二:

Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
	... 43 more

代码部分

package com.imooc.sss

import org.apache.spark.sql.SparkSession
object AccessKafkaData {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName(this.getClass.getName).getOrCreate()

    kafkaSource(spark)

    def kafkaSource(spark: SparkSession): Unit = {
      import spark.implicits._
      spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "hadoop000:9092, hadoop000:9093, hadoop000:9094")
        .option("subscribe", "localtest-replicated-topic")
        .load()
        .selectExpr("CAST(value AS STRING)")
        .as[String].flatMap(_.split("/t"))
        .writeStream
        .format("console")
        .start()
        .awaitTermination()
    }
  }
}

写回答

2回答

酱汁怪兽

提问者

2022-04-19

PK,已解决

1、Scala代码部分,将云主机的名字改为内网ip。

2、修改kafka的三个server.properties

# 允许外部端口连接                                            

listeners=PLAINTEXT://0.0.0.0:9092  

# 外部代理地址                                                

advertised.listeners=PLAINTEXT://121.201.64.12:9092



1
0

酱汁怪兽

提问者

2022-04-18

[root@hadoop000 ~]# kafka-topics.sh --list --bootstrap-server hadoop000:9092

__consumer_offsets

localtest-replicated-topic

[root@hadoop000 ~]# kafka-topics.sh --list --bootstrap-server hadoop000:9093

__consumer_offsets

localtest-replicated-topic

[root@hadoop000 ~]# kafka-topics.sh --list --bootstrap-server hadoop000:9094

__consumer_offsets

localtest-replicated-topic

0
0

Spark3实时处理-Streaming+StructuredStreaming实战

实战Spark3实时处理,掌握两套企业级处理方案

340 学习 · 238 问题

查看课程