kafka异步生产者提交数据的问题

来源:6-4 Golang使用kafka的正确姿势 (一)

404_

2023-02-10

我发现在异步提交的时候,必须使用time.sleep一段时间,这个时候消费者才能消费到数据。

func produceAsyncMsg() {
	err := mq.InitAsyncKafkaProducer(mq.DefaultKafkaAsyncProducer, hosts, nil)
	if err != nil {
		fmt.Println("InitAsyncKafkaProducer error", err)
	}
	msg := Msg{
		ID:       1,
		Name:     "test name async",
		CreateAt: time.Now().Unix(),
	}
	msgBody, _ := json.Marshal(msg)

	err = mq.GetKafkaAsyncProducer(mq.DefaultKafkaAsyncProducer).Send(&sarama.ProducerMessage{Topic: topic, Value: mq.KafkaMsgValueEncoder(msgBody)})
	if err != nil {
		fmt.Println("Send msg error", err)
	} else {
		fmt.Println("Send msg success")
	}
	//异步提交需要等待
	time.Sleep(3 * time.Second)  这边就必须的sleep,否则消费者消费不到
}

我用得是这种消费方式

./kafka-console-consumer.sh --bootstrap-server 192.168.2.45:9092,192.168.2.46:9092,192.168.2.47:9092 --topic second

我感觉这种sleep的方式不太优雅,请问老师有什么优化的方案吗?

写回答

1回答

少林码僧

2023-02-10

课程中介绍过,我们的服务都是常驻进程,通过在main函数中使用select{}或者shutdown.NewHook().Close()方法来监听退出信号到达main进程常驻的效果,这样进程不退出就可以达到一直发送或接收消息的效果

0
0

海量数据高并发场景,构建Go+ES8企业级搜索微服务

全新 ES8 配合技术组件,实现高性能搜索

267 学习 · 54 问题

查看课程