消息消费后仍然在队列中
来源:3-5 Confirm确认消息详解

慕容5034545
2023-03-09
我的Topic类型的Exchange交换机,发现我的生产者进行投递的消息,消费者接收到消息后,消息仍然在Queue中,导致我每次启动消费者监听这个Queue时,之前的message我都会重复接收到
下面是我的代码
package com.rabbitmq.quickstart.confirm;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author lihongyu
* @CreateTime 2023年3月9日
* @Description 生产端
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.188.150");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 2. 创建连接
Connection connection = connectionFactory.newConnection();
// 3. 创建通道
Channel channel = connection.createChannel();
// 4. 指定confirm类型
channel.confirmSelect();
// 5. 进行推送消息
String exchangeName = "test_exchange_confirm";
String routingKey = "test.confirm.ack";
// 6. 进行发送消息
String message = "Hello RabbitMQ Confirm Message !";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
// 7. 添加ConfirmListener
channel.addConfirmListener(new ConfirmListener() {
/**
* confirm失败回调函数(No Ack)
* deliveryTag 消息的唯一标识,实现消息可靠性投递或者进行补偿的时候需要用到这个,通常再投递的时候就会把消息的唯一标识通过Properties设置到消息中
* multiple:是否批量,这个暂时不用管
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// TODO Auto-generated method stub
System.out.println("No Ack Callback Is Active !");
}
/**
* confirm成功回调函数(Ack)
* deliveryTag 消息的唯一标识,实现消息可靠性投递或者进行补偿的时候需要用到这个,通常再投递的时候就会把消息的唯一标识通过Properties设置到消息中
* multiple:是否批量,这个暂时不用管
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// TODO Auto-generated method stub
System.out.println("Ack Callback Is Active !");
}
});
}
}
package com.rabbitmq.quickstart.confirm;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* @author lihongyu
* @CreateTime 2023年3月9日
* @Description 消费端
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.188.150");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 2. 通过连接工厂获取连接
Connection connection = connectionFactory.newConnection();
// 3. 通过连接创建通道
Channel channel = connection.createChannel();
// 4. 声明交换机
String exchangeName = "test_exchange_confirm";
String routingKey = "test.confirm.*";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// 5. 声明队列
String queueName = "test_queue_confirm";
channel.queueDeclareNoWait(queueName, true, false, false, null);
// 6. 绑定交换机与消费队列
channel.queueBind(queueName, exchangeName, routingKey);
// 7. 创建队列消费者
QueueingConsumer queueConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, queueConsumer);
Delivery nextDelivery = queueConsumer.nextDelivery();
String message = new String(nextDelivery.getBody());
System.out.println("消费端:" + message);
channel.close();
connection.close();
}
}
写回答
1回答
-
慕容5034545
提问者
2023-03-09
是因为我在设置关联消费者与队列的时候,没有指定AutoAck自动签收,默认是False需要自己手动签收
00
相似问题