自定义partition调用异步阻塞这样的方式
来源:5-7 Producer自定义Partition负载均衡

weixin_慕仰1047700
2020-04-30
老师请问下,如果自定义partition,调用异步阻塞这样的方式,就会出错,会出现死循环;如果是异步+callback则不会,请问下这里面的逻辑是为什么呢?
2回答
-
小麻雀呀
2020-04-30
老哥,代码贴出来看看
1112020-05-01 -
weixin_慕仰1047700
提问者
2020-04-30
public static void asynchronousSendWithCustomizedPartition() throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.151.80:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, " 33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.demo.producer.PartitionSample");
//main object
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
//message object - ProducerRecord
for(int i=0; i<10; i++){
ProducerRecord<String, String> record
= new ProducerRecord<>(TOPIC_NAME, "key-"+i, "value-"+i);
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
System.out.println("key-"+i+", "+"partition: "+ recordMetadata.partition()+" , offset: "+ recordMetadata.offset()); }
//close producer
producer.close();
}00
相似问题