生产者.send()和生产者.send().get()

来源:5-1 Producer章节介绍

乃好

2020-05-26

老师您好,我编写了一个客户端,生产者发送数据,并且服务器上使用bin/kafka-console-consumer.sh命令,观察是否有数据过来,发现了一个奇怪的现象

下面是我的代码:

package com.gx.kafkastudy.掘金.生产者;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaProducerAnalysis
{
    public static final String brokerList = "192.168.1.191:9092,192.168.1.65:9092,192.168.1.82:9092";
    public static final String topic = "topic-demo";

    public static Properties initConfig()
    {
        Properties pros = new Properties();
        pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        pros.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pros.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pros.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        pros.put(ProducerConfig.RETRIES_CONFIG, 10);
        return pros;
    }

    public static void main(String[] args)
    {
        Properties pros = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pros);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello, Kafka!");
        try
        {
            //这样写,消费者接收不到数据
            producer.send(record);

            //这样写,消费者也接收不到数据
            producer.send(record, new Callback()
            {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e)
                {
                    //并且也打印不出任何内容
                    System.out.println(recordMetadata.topic());
                }
            });



            //这样写,消费者可以接收到数据
            producer.send(record).get();

            //这样写,消费者也可以接收到数据
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata recordMetadata = future.get();

            //这样写,消费者也可以接收到数据
            producer.send(record, new Callback()
            {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e)
                {
                    //也可以打印出内容
                    System.out.println(recordMetadata.topic());
                }
            }).get();

        }
        catch (Exception e)
        {
            e.printStackTrace();
        }

    }
}

一开始我使用的是send,但是发现并没有数据得到,后来又加了一个get,可以得到了,经过我的反复测试,还真是这样,唯一的区别就是后面有没有get方法,按理说这不影响消息的发送呀,get只是可以获取发送出去的数据而已,这是什么原因呢?


写回答

1回答

Allen

2020-05-26

你只不过把异步变同步了, 把Producer的配置按照课程里的内容配置一下, 异步就能收到了

1
2
乃好
破案了:https://blog.csdn.net/QYHuiiQ/article/details/88757209
2020-05-26
共2条回复

Kafka多维度系统精讲,从入门到实战开发

系统讲解Kafka,实战结合,让你成为使用Kafka的高手

896 学习 · 237 问题

查看课程