云主机kafka对接SSS,有2个报错。
来源:8-10 对接Kafka数据源数据

酱汁怪兽
2022-04-18
描述:
已开启的服务:
dfs、yarn、zookeeper、flume、kafka、consumer、master
[root@hadoop000 logtest]# jps -m
512 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-local-test3.properties
32577 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-local-test2.properties
30308 SecondaryNameNode
30151 DataNode
616 ConsoleConsumer --bootstrap-server hadoop000:9092,hadoop000:9093,hadoop000:9094 --topic localtest-replicated-topic
1001 Application --conf-file /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config/flume-kafka-logger.conf --name a1
31724 QuorumPeerMain /home/hadoop/app/zookeeper-3.4.5-cdh5.16.2/bin/../conf/zoo.cfg
30029 NameNode
32175 Kafka /home/hadoop/app/kafka_2.12-2.5.0/config/server-local-test1.properties
5206 Jps -m
30779 NodeManager
30492 ResourceManager
1247 Master --host hadoop000 --port 7077 --webui-port 8080
本地sss程序对接kafka的consumer
报错一:
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
报错二:
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
... 43 more
代码部分
package com.imooc.sss
import org.apache.spark.sql.SparkSession
object AccessKafkaData {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]")
.appName(this.getClass.getName).getOrCreate()
kafkaSource(spark)
def kafkaSource(spark: SparkSession): Unit = {
import spark.implicits._
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop000:9092, hadoop000:9093, hadoop000:9094")
.option("subscribe", "localtest-replicated-topic")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String].flatMap(_.split("/t"))
.writeStream
.format("console")
.start()
.awaitTermination()
}
}
}
2回答
-
酱汁怪兽
提问者
2022-04-19
PK,已解决
1、Scala代码部分,将云主机的名字改为内网ip。
2、修改kafka的三个server.properties
# 允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092
# 外部代理地址
advertised.listeners=PLAINTEXT://121.201.64.12:9092
10 -
酱汁怪兽
提问者
2022-04-18
[root@hadoop000 ~]# kafka-topics.sh --list --bootstrap-server hadoop000:9092
__consumer_offsets
localtest-replicated-topic
[root@hadoop000 ~]# kafka-topics.sh --list --bootstrap-server hadoop000:9093
__consumer_offsets
localtest-replicated-topic
[root@hadoop000 ~]# kafka-topics.sh --list --bootstrap-server hadoop000:9094
__consumer_offsets
localtest-replicated-topic
00
相似问题