客服端无法接收数据,但是linux上能接收到

来源:4-10 -Kafka Consumer Java API编程

96年的nash

2019-09-28

package com.imooc.spark.kafka;

/**
 * Kafka常用配置文件
 */
public class KafkaProperties {

    public static final String ZK = "10.1.7.132:2181";

    public static final String TOPIC = "hello_topic";

    public static final String BROKER_LIST = "10.1.7.132:9092";

    public static final String GROUP_ID = "test_group1";
}

package com.imooc.spark.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Kafka消费者
 */
public class KafkaConsumer extends Thread{

    private String topic;

    public KafkaConsumer(String topic) {
        this.topic = topic;
    }


    private ConsumerConnector createConnector(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect", KafkaProperties.ZK);
        properties.put("group.id",KafkaProperties.GROUP_ID);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    @Override
    public void run() {
        ConsumerConnector consumer = createConnector();

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
//        topicCountMap.put(topic2, 1);
//        topicCountMap.put(topic3, 1);

        // String: topic
        // List<KafkaStream<byte[], byte[]>>  对应的数据流
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream =  consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0);   //获取我们每次接收到的暑假

        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();


        while (iterator.hasNext()) {
            String message = new String(iterator.next().message());
            System.out.println("rec: " + message);
        }
    }
}

我是用win10结合虚拟机开发的,OOTB环境搭建的没问题,防火墙关上了,之前的Producer时,linux端的Producer可以接收到win10的idea端发送过去的数据。但本节中,linux上的producer依然可以接收到,但的win10的idea环境却报了以下错误:
![图片描述
图片描述
linux上的zookeeper配置是按照课程上配的
图片描述
实在查不出什么错误了,请老师指导

写回答

3回答

Michael_PK

2019-09-28

你把server那个配置文件里,有个什么listener的打开再试试

0
2
96年的nash
非常感谢!
2019-09-29
共2条回复

Vickey是码农

2020-03-07

兄弟,你这个问题怎么解决的?谢谢

0
0

96年的nash

提问者

2019-09-28

//img.mukewang.com/szimg/5d8f4bcf090ef07a08420513.jpg

老师您说的是这个listeners吗

0
1
Michael_PK
是的,重启下kafka,然后创建一个新topic试试
2019-09-28
共1条回复

Spark Streaming实时流处理项目实战

Flume+Kafka+Spark Streaming 构建通用实时流处理平台

1404 学习 · 571 问题

查看课程