我采用ForkJoin框架进行批量插入一个任务失败了其他任务的数据不会回滚

来源:7-3 J.U.C-ForkJoin

qq_不弃_7

2020-01-13

    class ForkJoinInsert extends RecursiveTask<Integer> {
    private  int start;
    private  int end;
    private  List<PhoneBill> list;
    int sum = 0;
    private static final int THRESHOLD = 10000; //临界值
    public ForkJoinInsert(int start, int end,List<PhoneBill> list) {
        this.start = start;
        this.end = end;
        this.list = list;
    }

    @Override
    protected Integer compute()  {

        int length = end -start;
        if (length <= THRESHOLD){
            //在临界值范围内,实现业务代码
            List<PhoneBill> temList = new ArrayList<>();
            for (int i = start;i <=end;i++){
                if(i==list.size()){
                    break;
                }
                temList.add(list.get(i));
            }
            //入库操作
            phoneBillService.batchAddPhoneBill(temList);
            sum++;
            return sum;
        }else {
            //不在临界值范围,继续拆分
            int middle = (start + end)/2;

            ForkJoinInsert left =new ForkJoinInsert(start,middle,list);
            left.fork();
            ForkJoinInsert right = new ForkJoinInsert(middle + 1,end ,list);
            right.fork();
            int l = left.join() + right.join();
            return l;
        }
    }

调用次类的service

@Transactional(rollbackFor = Exception.class)
@Override
public void impExcel(MultipartFile file) {
    String fileName = file.getOriginalFilename();
    String suffix = fileName.substring(fileName.lastIndexOf(".") + 1);
    InputStream in = null;
    try {
        in = file.getInputStream();
        Workbook wb = null;
        if (suffix.equals("xlsx")) {
            wb = new XSSFWorkbook(in);

        } else {
            wb = new HSSFWorkbook(in);
        }
        //获取excel第一页
        Sheet sheet = wb.getSheetAt(0);
        List<PhoneBill> saveList = new ArrayList<PhoneBill>();
        if (!ObjectUtils.isEmpty(sheet)) {
            for (int line = 1, maxline = sheet.getLastRowNum(); line <= maxline; line++) {
                Row row = sheet.getRow(line);
                if (null == row) {
                    continue;
                }
                //获取每行的指定列数据【电话号,日期,金额】
                Cell cell2 = row.getCell(2);
                cell2.setCellType(CellType.STRING);
                String phone = cell2.getStringCellValue();
                Cell cell4 = row.getCell(4);
                cell4.setCellType(CellType.NUMERIC);
                LocalDateTime payDate = DateUtil.getLocalDateTime(cell4.getDateCellValue());
                Cell cell6 = row.getCell(6);
                cell6.setCellType(CellType.STRING);
                BigDecimal cost = BigDecimal.valueOf(Double.parseDouble(cell6.getStringCellValue()));
                PhoneBill phoneBill = null;
                if (ObjectUtils.isEmpty(phoneBill)) {
                    phoneBill = new PhoneBill();
                    phoneBill.setPhone(phone);
                    phoneBill.setPayDate(payDate);
                    phoneBill.setCost(cost);
                    phoneBill.setCreatedAt(LocalDateTime.now());
                    phoneBill.setUpdatedAt(LocalDateTime.now());
                    saveList.add(phoneBill);
                }
            }
            long begin = System.currentTimeMillis();
            Integer compute = 0;
            try {
                ForkJoinInsert fjc = new ForkJoinInsert(0, saveList.size(), saveList);
                compute = fjc.compute();
            } catch (Exception ex) {
                ex.printStackTrace();
                throw new PhoneException(500, "批量新增出错");
            }

            long end = System.currentTimeMillis();
            System.out.println("总时间:" + (end - begin) / 1000F);
            System.out.println("总长度" + saveList.size());
            System.out.println("总次数" + compute);
        }
    } catch (IOException ex) {
        throw new PhoneException(500, "导入失败");
    } finally {
        try {
            in.close();
        } catch (Exception ex) {
            throw new PhoneException(500, "导入失败");
        }
    }
}

其中一个任务处理失败了其他的任务插入的数据并没有回滚
请问下老师如何控制一个子任务出错所有数据都回滚呢

写回答

1回答

Jimin

2020-01-13

你好,这个已经是多线程执行了,每个线程的执行是分开的,不具备普通事务的能力。如果硬要回滚,只能尝试引入分布式事务了,但并不一定真的能解决问题。我的建议是,你重新调整一下设计,处理好可能出现的问题,正常这个框架不该存在需要回滚的操作。其实回滚这种操作在分布式环境下很难,除非是一个线程多次操作一个db这种场景,稍微复杂一点回滚就会失效。

0
1
qq_不弃_7
谢谢老师
2020-01-14
共1条回复

Java高并发编程,构建并发知识体系,提升面试成功率

构建完整并发与高并发知识体系,倍增高薪面试成功率!

3923 学习 · 832 问题

查看课程