同步生产者发送多条消息的代码逻辑是不是有问题?

来源:6-5 Golang使用kafka的正确姿势 (二)

404_

2023-11-28

当执行到 errs = (*syncProducer.SyncProducer).SendMessages(mses).(sarama.ProducerErrors) 这一行的时候,是不是又会进入到
func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) (errs sarama.ProducerErrors) 这个函数中?这不就循环了吗?

func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) (errs sarama.ProducerErrors) {
	if syncProducer.Status != KafkaProducerConnected {
		return append(errs, &sarama.ProducerError{Err: errors.New("kafka syncProducer " + syncProducer.Status)})
	}
	errs = (*syncProducer.SyncProducer).SendMessages(mses).(sarama.ProducerErrors)
	for _, err := range errs {
		//触发重连。// 如果是因为kafka连接异常导致的发送错误,则进行生产者与kafka的重新连接操作
		if errors.Is(err, sarama.ErrBrokerNotAvailable) {
			syncProducer.StatusLock.Lock()
			if syncProducer.Status == KafkaProducerConnected {
				syncProducer.Status = KafkaProducerDisconnected
				syncProducer.ReConnect <- true
			}
			syncProducer.StatusLock.Unlock()
		}
	}
	return
}

我在测试这个方法的时候发现也是报错了。错误如下:panic: interface conversion: error is nil, not sarama.ProducerErrors
测试代码是:

	msgs := make([]*sarama.ProducerMessage, 0)
	for i := 0; i < 3; i++ {
		msg := Msg{
			ID:       int64(i),
			Name:     fmt.Sprintf("user%d", i),
			CreateAt: time.Now().Unix(),
		}
		msgBody, _ := json.Marshal(msg)
		msgs = append(msgs, &sarama.ProducerMessage{
			Topic: topic,
			Value: mq.KafkaMsgValueEncoder(msgBody),
		})
	}

	mq.GetKafkaSyncProducer(mq.DefaultKafkaSyncProducer).SendMessages(msgs)

写回答

1回答

少林码僧

2023-11-28

如果是递归调用,则方式如下:

func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) (errs sarama.ProducerErrors) {
syncProducer.SendMessages()
}

这里是调用syncProducer.SyncProducer对象的SendMessages方法,而syncProducer.SyncProducer是 github.com/Shopify/sarama 这个sdk中的对象

这里报错是因为发送之后如果没有错误是没法直接将error转成sarama.ProducerErrors的。可以加个判断:

func (syncProducer *SyncProducer) SendMessages(mses []*sarama.ProducerMessage) sarama.ProducerErrors {
    if syncProducer.Status != KafkaProducerConnected {
       return sarama.ProducerErrors{&sarama.ProducerError{Err: errors.New("kafka syncProducer " + syncProducer.Status)}}
    }
    errs := (*syncProducer.SyncProducer).SendMessages(mses)
    if errs != nil {
       retErrs, ok := errs.(sarama.ProducerErrors)
       if !ok {
          KafkaStdLogger.Println("retErrs convert error:", zap.Error(errs))
          return retErrs
       }
       for _, err := range retErrs {
          //触发重连
          if errors.Is(err, sarama.ErrBrokerNotAvailable) {
             syncProducer.StatusLock.Lock()
             if syncProducer.Status == KafkaProducerConnected {
                syncProducer.Status = KafkaProducerDisconnected
                syncProducer.ReConnect <- true
             }
             syncProducer.StatusLock.Unlock()
          }
       }
       return retErrs
    }
    return nil
}





0
0

海量数据高并发场景,构建Go+ES8企业级搜索微服务

全新 ES8 配合技术组件,实现高性能搜索

267 学习 · 54 问题

查看课程