stream消费消息
来源:10-2 Spring Cloud Gateway是什么?优缺点分析

strictShu
2019-11-17
- 测试了两次消费过程,执行成功,添加积分成功. 然后程序一直运行,没有关闭,过了大约15分钟,发现控制台又打印了 3次 接收到消息,xxxx 但是没有执行userService.addBonus(message.getPayload());
- 而且还报了String 转 UserAddBonusMsgDTO的错误.
- 消费完的消息在控制台 消息 -->主题 中还能查看到,是正确的吗?
代码:
public void receive(Message<UserAddBonusMsgDTO> message) {
log.info("接收到消息,{}",message.getPayload());
userService.addBonus(message.getPayload());
}
写回答
1回答
-
您好,消费完成的消息,在控制台上查询时,理应是已消费(CONSUMED)状态。如果不是的话,那一般是您的消费者有问题导致的。
举个例子:消费者消费的时候,抛出异常,那么消息就不会标记为已消费(CONSUMED)状态,而是继续积压在RocketMQ Server端,RocketMQ会对这些消息进行重试,默认情况下会重试多达15次。
就您的描述,以及贴出的日志来看,很像是消费失败所导致的。您日志截出来的“接收到消息”,只能说明消费者能够正常收取消息,但并不代表消费消息能够成功。消费消息至少意味着,您消费消息的阶段,从开始到消费完成,不能抛出异常。
我猜测有两种可能性:
消费者处理的逻辑存在异常
某一次在测试的时候,您只启动了消息生产者(content-center),没有启动消息消费者(user-center)于是,生产者生产的消息,在RocketMQ里积压了。后来启动user-center的时候,也会去进行消费。
我并没有遇到过您遇到的问题,可以排查一下哈。
00
相似问题