使用@RocketMQMessageListener拿不到消息

来源:9-10 Spring消息编程模型02-编写消费者

朱小悬

2021-08-05

老师,我是用@RocketMQMessageListener来标明消费者,但是这个消费者一会能消费到消息,一会不能消费到消息。

然后我去控制台看了不能被消费的消息状态,trackType是NOT_CONSUME_YET。

请问这是什么情况呀?

代码如下:

@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer.group.tenant-info-increment-sync}",
        topic = "${csp.product}_MAIN_INFO",
        selectorExpression = "increment",
        consumeThreadMax = 10,
        messageModel = MessageModel.CLUSTERING
)
public class TenantInfoIncrementSyncConsumer implements RocketMQListener<List<MainInfo>>, RocketMQPushConsumerLifecycleListener {

    /**
     * tenantInfoService
     */
    @Autowired
    private TenantInfoService tenantInfoService;

    @Override
    public void onMessage(List<MainInfo> mainInfos) {
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {

        // 消费者参数设置:从最尾消费,每次拉取一条数据
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setInstanceName("TenantInfoIncrementSyncConsumer");
        consumer.setPullBatchSize(10);
        consumer.setMaxReconsumeTimes(3);

        consumer.registerMessageListener((MessageListenerConcurrently) (messages, consumeConcurrentlyContext) -> {

            if (CollectionUtils.isNotEmpty(messages)) {

                try {

                    for (MessageExt message : messages) {

                        // 将拉取到的消息转化成指定格式
                        log.info("拉取到的消息为:" + new String(message.getBody()));

                        List<MainInfo> mainInfos = JSONUtil.toList(new JSONArray(new String(message.getBody())), MainInfo.class);

                        if (CollectionUtils.isNotEmpty(mainInfos)) {

                            int size = mainInfos.size();

                            List<String> incrementMainIds = new ArrayList<>(size);
                            List<String> decrementMainIds = new ArrayList<>(size);
                            List<String> disabledMainIds = new ArrayList<>(size);

                            mainInfos.forEach(mainInfo -> {
                                if (mainInfo.getStatus() == 1) {
                                    incrementMainIds.add(mainInfo.getMainId());
                                } else if (mainInfo.getStatus() == 0) {
                                    decrementMainIds.add(mainInfo.getMainId());
                                } else {
                                    disabledMainIds.add(mainInfo.getMainId());
                                }
                            });

                            if (CollectionUtils.isNotEmpty(incrementMainIds)) {

                                Map<String, List<String>> mainInfoMap = tenantInfoService.exist(incrementMainIds);

                                if (MapUtils.isNotEmpty(mainInfoMap)) {
                                    if (CollectionUtils.isNotEmpty(mainInfoMap.get("0"))) {
                                        List<TenantInfo> tenantInfos = new ArrayList<>();
                                        for (String mainId : mainInfoMap.get("0")) {
                                            TenantInfo tenantInfo = new TenantInfo();
                                            tenantInfo.setId(mainId);
                                            tenantInfo.setTenantStatus(1);
                                            tenantInfos.add(tenantInfo);
                                        }
                                        tenantInfoService.addBatch(tenantInfos);
                                    }
                                    if (CollectionUtils.isNotEmpty(mainInfoMap.get("1"))) {
                                        tenantInfoService.updateStatusBatch(mainInfoMap.get("1"), 1);
                                    }
                                }
                            }

                            if (CollectionUtils.isNotEmpty(decrementMainIds)) {
                                tenantInfoService.updateStatusBatch(decrementMainIds, 0);
                            }

                            if (CollectionUtils.isNotEmpty(disabledMainIds)) {
                                tenantInfoService.updateStatusBatch(disabledMainIds, 10);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("消费数据失败", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
    }
}
写回答

1回答

大目

2021-08-13

您好,看您的描述,有点像是命中了负载均衡机制。

建议检查是否有多个消费者实例。
当有多个消费者时,会rocketmq server采用负载均衡机制投递到其中一个实例去消费。

0
2
大目
回复
朱小悬
目测你的代码没有问题,请问能提供完整代码吗?我来看看。
2021-08-30
共2条回复

Spring Cloud Alibaba微服务从入门到进阶

面向未来微服务:熟练掌握Spring Cloud Alibaba

3085 学习 · 1324 问题

查看课程