同步生产者发送多条消息的代码逻辑是不是有问题?
来源:6-5 Golang使用kafka的正确姿势 (二)

404_
2023-11-28
当执行到 errs = (*syncProducer.SyncProducer).SendMessages(mses).(sarama.ProducerErrors) 这一行的时候,是不是又会进入到
func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) (errs sarama.ProducerErrors) 这个函数中?这不就循环了吗?
func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) (errs sarama.ProducerErrors) {
if syncProducer.Status != KafkaProducerConnected {
return append(errs, &sarama.ProducerError{Err: errors.New("kafka syncProducer " + syncProducer.Status)})
}
errs = (*syncProducer.SyncProducer).SendMessages(mses).(sarama.ProducerErrors)
for _, err := range errs {
//触发重连。// 如果是因为kafka连接异常导致的发送错误,则进行生产者与kafka的重新连接操作
if errors.Is(err, sarama.ErrBrokerNotAvailable) {
syncProducer.StatusLock.Lock()
if syncProducer.Status == KafkaProducerConnected {
syncProducer.Status = KafkaProducerDisconnected
syncProducer.ReConnect <- true
}
syncProducer.StatusLock.Unlock()
}
}
return
}
我在测试这个方法的时候发现也是报错了。错误如下:panic: interface conversion: error is nil, not sarama.ProducerErrors
测试代码是:
msgs := make([]*sarama.ProducerMessage, 0)
for i := 0; i < 3; i++ {
msg := Msg{
ID: int64(i),
Name: fmt.Sprintf("user%d", i),
CreateAt: time.Now().Unix(),
}
msgBody, _ := json.Marshal(msg)
msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: mq.KafkaMsgValueEncoder(msgBody),
})
}
mq.GetKafkaSyncProducer(mq.DefaultKafkaSyncProducer).SendMessages(msgs)
写回答
1回答
-
如果是递归调用,则方式如下:
func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) (errs sarama.ProducerErrors) { syncProducer.SendMessages() }
这里是调用syncProducer.SyncProducer对象的SendMessages方法,而syncProducer.SyncProducer是 github.com/Shopify/sarama 这个sdk中的对象
这里报错是因为发送之后如果没有错误是没法直接将error转成sarama.ProducerErrors的。可以加个判断:
func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) sarama.ProducerErrors { if syncProducer.Status != KafkaProducerConnected { return sarama.ProducerErrors{&sarama.ProducerError{Err: errors.New("kafka syncProducer " + syncProducer.Status)}} } errs := (*syncProducer.SyncProducer).SendMessages(mses) if errs != nil { retErrs, ok := errs.(sarama.ProducerErrors) if !ok { KafkaStdLogger.Println("retErrs convert error:", zap.Error(errs)) return retErrs } for _, err := range retErrs { //触发重连 if errors.Is(err, sarama.ErrBrokerNotAvailable) { syncProducer.StatusLock.Lock() if syncProducer.Status == KafkaProducerConnected { syncProducer.Status = KafkaProducerDisconnected syncProducer.ReConnect <- true } syncProducer.StatusLock.Unlock() } } return retErrs } return nil }
00
相似问题