自定义partition调用异步阻塞这样的方式

来源:5-7 Producer自定义Partition负载均衡

weixin_慕仰1047700

2020-04-30

老师请问下,如果自定义partition,调用异步阻塞这样的方式,就会出错,会出现死循环;如果是异步+callback则不会,请问下这里面的逻辑是为什么呢?

写回答

2回答

小麻雀呀

2020-04-30

老哥,代码贴出来看看

1
11
weixin_慕仰1047700
回复
小麻雀呀
刚刚重新写了一遍,就跑正常了,可能是之前写的时候哪里写错了没看到
2020-05-01
共11条回复

weixin_慕仰1047700

提问者

2020-04-30

public static void asynchronousSendWithCustomizedPartition() throws ExecutionException, InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.151.80:9092");
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    properties.put(ProducerConfig.RETRIES_CONFIG, "0");
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
    properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, " 33554432");

    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.demo.producer.PartitionSample");

    //main object
    Producer<String, String> producer = new KafkaProducer<String, String>(properties);

    //message object - ProducerRecord
    for(int i=0; i<10; i++){
        ProducerRecord<String, String> record
                = new ProducerRecord<>(TOPIC_NAME, "key-"+i, "value-"+i);

        Future<RecordMetadata> send = producer.send(record);
        RecordMetadata recordMetadata = send.get();
        System.out.println("key-"+i+", "+"partition: "+ recordMetadata.partition()+" , offset: "+             recordMetadata.offset());     }

    //close producer
    producer.close();
}

0
0

Kafka多维度系统精讲,从入门到实战开发

系统讲解Kafka,实战结合,让你成为使用Kafka的高手

896 学习 · 237 问题

查看课程