关于消息队列和数据库中的数据一致性的问题

来源:9-10 利用RabbitMQ实现消息投递削峰填谷(一)

慕斯902xzxc_das

2022-06-04

老师您好,在发送和接收消息时都需要往数据库中写入消息,请问如何可以保证消息队列和数据库中的数据一致呢?

比如,发送消息时,如果数据库中保存成功,但是mq发送消息失败,那么这一条消息不就丢失了吗,并且数据库里还有这条消息的记录,如下:

   public void send(String topic, MessageEntity entity) {
        String id = messageService.insertMessage(entity); 
        // 假设在数据库保存之后,发送消息之前出现异常。
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(topic, true, false, false, null);
            HashMap header = new HashMap();
            header.put("messageId", id);
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
            // 或者假设消息发送失败
            channel.basicPublish("", topic, properties, entity.getMsg().getBytes());
            log.debug("消息发送成功");
        } catch (Exception e) {
            log.error("执行异常", e);
            throw new EmosException("向MQ发送消息失败");
        }
    }

还有就是接收消息时,如果数据库中写入成功,但是 ack 时出现异常,那么消息不就又重新写回消息队列里,那么下次就又会再往数据库里存一份相同的记录了?

public int receive(String topic) {
        int i = 0;
        try (
             Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(topic, true, false, false, null);
            while (true) {
                GetResponse response = channel.basicGet(topic, false);
                if (response != null) {
                    AMQP.BasicProperties properties = response.getProps();
                    Map<String, Object> header = properties.getHeaders(); 
                    String messageId = header.get("messageId").toString();
                    byte[] body = response.getBody();
                    String message = new String(body);
                    log.debug("从RabbitMQ接收的消息:" + message);
                    MessageRefEntity entity = new MessageRefEntity();
                    entity.setMessageId(messageId);
                    entity.setReceiverId(Integer.parseInt(topic));
                    entity.setReadFlag(false);
                    entity.setLastFlag(true);
                    messageService.insertRef(entity); 
                    //数据保存成功,但 ack 确认失败
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    i++;
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            log.error("执行异常", e);
        }
        return i;
    }
写回答

1回答

神思者

2022-06-04

不可能有一致性,mongodb都没有事务,怎么保持一致性。系统通知不是高价值数据,没必要弄一致性。

0
0

SpringBoot 在线协同办公小程序开发 全栈式项目实战

未来趋势型业务 + 前后端综合技术栈 + 惊艳的细节打磨

1802 学习 · 1919 问题

查看课程