关于消息队列和数据库中的数据一致性的问题
来源:9-10 利用RabbitMQ实现消息投递削峰填谷(一)

慕斯902xzxc_das
2022-06-04
老师您好,在发送和接收消息时都需要往数据库中写入消息,请问如何可以保证消息队列和数据库中的数据一致呢?
比如,发送消息时,如果数据库中保存成功,但是mq发送消息失败,那么这一条消息不就丢失了吗,并且数据库里还有这条消息的记录,如下:
public void send(String topic, MessageEntity entity) {
String id = messageService.insertMessage(entity);
// 假设在数据库保存之后,发送消息之前出现异常。
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(topic, true, false, false, null);
HashMap header = new HashMap();
header.put("messageId", id);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
// 或者假设消息发送失败
channel.basicPublish("", topic, properties, entity.getMsg().getBytes());
log.debug("消息发送成功");
} catch (Exception e) {
log.error("执行异常", e);
throw new EmosException("向MQ发送消息失败");
}
}
还有就是接收消息时,如果数据库中写入成功,但是 ack 时出现异常,那么消息不就又重新写回消息队列里,那么下次就又会再往数据库里存一份相同的记录了?
public int receive(String topic) {
int i = 0;
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(topic, true, false, false, null);
while (true) {
GetResponse response = channel.basicGet(topic, false);
if (response != null) {
AMQP.BasicProperties properties = response.getProps();
Map<String, Object> header = properties.getHeaders();
String messageId = header.get("messageId").toString();
byte[] body = response.getBody();
String message = new String(body);
log.debug("从RabbitMQ接收的消息:" + message);
MessageRefEntity entity = new MessageRefEntity();
entity.setMessageId(messageId);
entity.setReceiverId(Integer.parseInt(topic));
entity.setReadFlag(false);
entity.setLastFlag(true);
messageService.insertRef(entity);
//数据保存成功,但 ack 确认失败
long deliveryTag = response.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
i++;
} else {
break;
}
}
} catch (Exception e) {
log.error("执行异常", e);
}
return i;
}
写回答
1回答
-
神思者
2022-06-04
不可能有一致性,mongodb都没有事务,怎么保持一致性。系统通知不是高价值数据,没必要弄一致性。
00
相似问题