老师,日志重复输出问题
来源:5-7 Producer自定义Partition负载均衡

小麻雀呀
2020-04-29
SamplePartition类中的partition方法
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String keyStr = String.valueOf(key);
Integer keyInt = Integer.valueOf(keyStr.substring(keyStr.length()-1));
log.info("keyStr:{},keyInt:{}", keyStr, keyInt);
return 0;
}
异步发送带回调方法
public static void producerSendByCallBackAndMyPartition(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.0.104:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.xieth.kafka.demo.producer.SamplePartition");
// Producer主对象
Producer producer = new KafkaProducer<>(properties);
for (int i = 0; i <= 5; i++) {
//消息对象
ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME, "k-"+i, "v-"+i);
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("partition : " + metadata.partition()
+ ", offset : " + metadata.offset());
}
});
}
// 关闭资源
producer.close();
}
日志输出
写回答
2回答
-
Allen
2020-05-01
这个log你是怎么打印的, 把日志打印的地方一起截个图呗
132020-05-06 -
小麻雀呀
提问者
2020-05-01
老师这是日志打印代码截图
00
相似问题