关于prefrech的疑问。

来源:4-7 消费端限流机制

慕沐3053333

2021-05-14

假如设置了prefrech为10,消费者1订阅的是队列a。队列a把10条消息推给消费者1,那消费者1对于这10条消息是在内存里也有一个队列用于存储这10条消息,然后一个一个消费处理,还是会用多个线程并发地去处理呢?

所以我的疑问简而言之就是两个

1、消费者内存是否有一个队列用于存储消息?

2、是一个一个处理消息,还是用多个线程并发地去处理?

写回答

2回答

Moody

2021-05-14

我又查了一下代码,之前回答的应该是说错了-_-,我再重新讲下:

  1. 不管是否开启限流,不论是不是用SpringAMQP,本地都有一个缓存队列,缓存收到但是还未处理的消息。

  2. 不开启限流时,消息确实已经推到消费者内存中的的缓存队列里了。

  3. 开启限流时,只会推有限个数的消息到消费者内存中的的缓存队列里。


1
1
慕沐3053333
第2点,不开启限流,MQ是有多少消息就推多少消息到消费者内存中的的缓存队列里吗?这样高并发场景下岂不是会导致消费者内存爆满?
2021-05-14
共1条回复

Moody

2021-05-14

  1. 现在没有,下一章就有了,具体看5-8节。SpringAMQP在本地做了一个队列用于缓存消息。

  2. 目前是你自己控制的,业务不做多线程就是单线程的。第五章用SpringAMQP就可以自己配置线程数。

1
5
慕移动2077246
回复
Moody
老师,我这么理解对不对。 第一点:假如说发送了50条消息给消费者,而消费者设置了限流机制,每次限流5条消息。此时50条消息处于ready状态存储在RabbitMQ的队列中,下一秒发送了5条消息给消费者,然后此时这5条消息在RabbitMQ中变为了unAck待确认状态,然后消费者获取到这5条消息存储到本地缓存队列中,因为在handleMessage方法中开启了异步线程,所以会同时开启多个线程同时处理本地缓存队列中的消息然后手动Ack确认消息,已被Ack的消息就会被RabbitMQ释放(删除)掉,然后再依次循环步骤直到消息被全部处理完成。 第二点:如果消费者没设置限流机制,那么50条消息就被队列就会一股脑把所有消息都推送给了消费者,然后这些消息在队列中全部变成了unAck状态,此时这些消息就堆积在消费者的本地缓存队列中,一条消息只能被一个消费者进行处理,既然消费者已经接收到了消息,那么无论处理成功或失败都得处理完成,然后其他的消费者就不能帮它分担压力。但是如果开启了限流机制,每一次消费者只接收指定的消息数量进行处理,剩下的消息就在队列里等待取走,如果此时队列消息数量太多,压力过大,就可以新开一个消费者从该队列中获取消息分担压力。 这是我的个人理解,不知道对不对,希望老师纠正一下
2022-10-04
共5条回复

RabbitMQ精讲,提升工程实践能力,培养架构思维

消息驱动架构+订单状态机,二次开发,手写分布式事务框架。

455 学习 · 202 问题

查看课程