消息消费后仍然在队列中

来源:3-5 Confirm确认消息详解

慕容5034545

2023-03-09

我的Topic类型的Exchange交换机,发现我的生产者进行投递的消息,消费者接收到消息后,消息仍然在Queue中,导致我每次启动消费者监听这个Queue时,之前的message我都会重复接收到
图片描述
图片描述
下面是我的代码

package com.rabbitmq.quickstart.confirm;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author lihongyu
 * @CreateTime 2023年3月9日
 * @Description 生产端
 */
public class Producer {

	public static void main(String[] args) throws Exception {
		// 1. 创建连接工厂
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.188.150");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("123456");
		
		// 2. 创建连接
		Connection connection = connectionFactory.newConnection();
		
		// 3. 创建通道
		Channel channel = connection.createChannel();
		
		// 4. 指定confirm类型
		channel.confirmSelect();
		
		// 5. 进行推送消息
		String exchangeName = "test_exchange_confirm";
		String routingKey = "test.confirm.ack";
		
		// 6. 进行发送消息
		String message = "Hello RabbitMQ Confirm Message !";
		channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
		
		// 7. 添加ConfirmListener
		channel.addConfirmListener(new ConfirmListener() {
			
			/**
			 * confirm失败回调函数(No Ack)
			 * deliveryTag 消息的唯一标识,实现消息可靠性投递或者进行补偿的时候需要用到这个,通常再投递的时候就会把消息的唯一标识通过Properties设置到消息中
			 * multiple:是否批量,这个暂时不用管
			 */
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				// TODO Auto-generated method stub
				System.out.println("No Ack Callback Is Active !");
			}
			
			/**
			 * confirm成功回调函数(Ack)
			 * deliveryTag 消息的唯一标识,实现消息可靠性投递或者进行补偿的时候需要用到这个,通常再投递的时候就会把消息的唯一标识通过Properties设置到消息中
			 * multiple:是否批量,这个暂时不用管
			 */
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				// TODO Auto-generated method stub
				System.out.println("Ack Callback Is Active !");
			}
		});
	}
	
}

package com.rabbitmq.quickstart.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
 * @author lihongyu
 * @CreateTime 2023年3月9日
 * @Description 消费端
 */
public class Consumer {

	public static void main(String[] args) throws Exception {
		// 1. 创建连接工厂
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.188.150");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("123456");
		
		// 2. 通过连接工厂获取连接
		Connection connection = connectionFactory.newConnection();
		
		// 3. 通过连接创建通道
		Channel channel = connection.createChannel();
		
		// 4. 声明交换机
		String exchangeName = "test_exchange_confirm";
		String routingKey = "test.confirm.*";
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		
		// 5. 声明队列
		String queueName = "test_queue_confirm";
		channel.queueDeclareNoWait(queueName, true, false, false, null);
		
		// 6. 绑定交换机与消费队列
		channel.queueBind(queueName, exchangeName, routingKey);
		// 7. 创建队列消费者
		QueueingConsumer queueConsumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, queueConsumer);
		Delivery nextDelivery = queueConsumer.nextDelivery();
		String message = new String(nextDelivery.getBody());
		System.out.println("消费端:" + message);
		
		channel.close();
		connection.close();
	}
	
}

写回答

1回答

慕容5034545

提问者

2023-03-09

是因为我在设置关联消费者与队列的时候,没有指定AutoAck自动签收,默认是False需要自己手动签收

0
0

RabbitMQ精讲 从0到1驾驭RabbitMQ应用与设计

从0到1,全面深入掌握RabbitMQ消息中间件技术

1460 学习 · 443 问题

查看课程