kafka异步生产者提交数据的问题
来源:6-4 Golang使用kafka的正确姿势 (一)

404_
2023-02-10
我发现在异步提交的时候,必须使用time.sleep一段时间,这个时候消费者才能消费到数据。
func produceAsyncMsg() {
err := mq.InitAsyncKafkaProducer(mq.DefaultKafkaAsyncProducer, hosts, nil)
if err != nil {
fmt.Println("InitAsyncKafkaProducer error", err)
}
msg := Msg{
ID: 1,
Name: "test name async",
CreateAt: time.Now().Unix(),
}
msgBody, _ := json.Marshal(msg)
err = mq.GetKafkaAsyncProducer(mq.DefaultKafkaAsyncProducer).Send(&sarama.ProducerMessage{Topic: topic, Value: mq.KafkaMsgValueEncoder(msgBody)})
if err != nil {
fmt.Println("Send msg error", err)
} else {
fmt.Println("Send msg success")
}
//异步提交需要等待
time.Sleep(3 * time.Second) 这边就必须的sleep,否则消费者消费不到
}
我用得是这种消费方式
./kafka-console-consumer.sh --bootstrap-server 192.168.2.45:9092,192.168.2.46:9092,192.168.2.47:9092 --topic second
我感觉这种sleep的方式不太优雅,请问老师有什么优化的方案吗?
写回答
1回答
-
少林码僧
2023-02-10
课程中介绍过,我们的服务都是常驻进程,通过在main函数中使用select{}或者shutdown.NewHook().Close()方法来监听退出信号到达main进程常驻的效果,这样进程不退出就可以达到一直发送或接收消息的效果
00
相似问题