回查事务函数checkLocalTransaction 会执行多次,正常吗???

来源:9-12 分布式事务02-编码实现

hthonor

2020-10-21

debug 断点跟 大目老师的一样

kill 掉 content-center 后 重启,多次点击 F9,会多次执行 checkLocalTransaction 函数,正常吗?

“加积分” 也执行了好几次。

http://img.mukewang.com/szimg/5f8ff8b90970453114920250.jpg

我的 @RocketMQTransactionListener注解 没有txProducerGroup 属性

我的代码

public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {
   //1.查询 share 是否存在
   Share share = this.shareMapper.selectByPrimaryKey(id);
   if (share == null) {
       throw new IllegalArgumentException("分享不存在!");
   }
   if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
       throw new IllegalArgumentException("参数非法!已经审核或者审核未通过");
   }

   //2.发送事务消息,如果是 PASS
   if (AuditStatusEnum.PASS.equals(shareAuditDTO.getAuditStatusEnum())) {
       String transactionId = UUID.randomUUID().toString();
       this.rocketMQTemplate.sendMessageInTransaction(
               "add-bonus",
               MessageBuilder
                       .withPayload(
                               UserAddBonusMsgDTO.builder()
                                       .userId(share.getUserId())
                                       .bonus(50)
                                       .build()
                       )
                       .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                       .setHeaderIfAbsent("shared_id", id)
                       .build(),
               shareAuditDTO
       );
   }else {  //reject 状态,就不发送消息
       this.auditByIdInDB(id, shareAuditDTO);
   }
   return share;
}




@RocketMQTransactionListener()
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
   private final ShareService shareService;
   private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
   /**
    * 发送事务消息函数 sendMessageInTransaction(String destination, Message<?> message, Object arg)
    * 参数对应起来的
    */
   @Override
   public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
       MessageHeaders headers= message.getHeaders();

       String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
       Integer sharedId = Integer.valueOf((String) headers.get("shared_id"));

       try {
           this.shareService.auditByIdWithRocketMqLog(sharedId, (ShareAuditDTO) arg,transactionId);
           return RocketMQLocalTransactionState.COMMIT;
       }catch (Exception e){
           return RocketMQLocalTransactionState.ROLLBACK;
       }
   }

   @Override
   public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
       MessageHeaders  headers= message.getHeaders();

       String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
       Integer sharedId = Integer.valueOf((String) headers.get("shared_id"));

       RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(
               RocketmqTransactionLog.builder()
                       .transactionId(transactionId).
                       build()
       );
       if(rocketmqTransactionLog != null){
           return RocketMQLocalTransactionState.COMMIT;
       }

       return RocketMQLocalTransactionState.ROLLBACK;
   }
}

写回答

1回答

大目

2020-10-22

您好,正常的,rocketmq会定时扫描半消息,并检查本地事务有没有提交。

0
2
大目
回复
hthonor
rocketmq的事务消息有重试机制
2020-10-24
共2条回复

Spring Cloud Alibaba微服务从入门到进阶

面向未来微服务:熟练掌握Spring Cloud Alibaba

3083 学习 · 1324 问题

查看课程