我采用ForkJoin框架进行批量插入一个任务失败了其他任务的数据不会回滚
来源:7-3 J.U.C-ForkJoin
qq_不弃_7
2020-01-13
class ForkJoinInsert extends RecursiveTask<Integer> {
private int start;
private int end;
private List<PhoneBill> list;
int sum = 0;
private static final int THRESHOLD = 10000; //临界值
public ForkJoinInsert(int start, int end,List<PhoneBill> list) {
this.start = start;
this.end = end;
this.list = list;
}
@Override
protected Integer compute() {
int length = end -start;
if (length <= THRESHOLD){
//在临界值范围内,实现业务代码
List<PhoneBill> temList = new ArrayList<>();
for (int i = start;i <=end;i++){
if(i==list.size()){
break;
}
temList.add(list.get(i));
}
//入库操作
phoneBillService.batchAddPhoneBill(temList);
sum++;
return sum;
}else {
//不在临界值范围,继续拆分
int middle = (start + end)/2;
ForkJoinInsert left =new ForkJoinInsert(start,middle,list);
left.fork();
ForkJoinInsert right = new ForkJoinInsert(middle + 1,end ,list);
right.fork();
int l = left.join() + right.join();
return l;
}
}
调用次类的service
@Transactional(rollbackFor = Exception.class)
@Override
public void impExcel(MultipartFile file) {
String fileName = file.getOriginalFilename();
String suffix = fileName.substring(fileName.lastIndexOf(".") + 1);
InputStream in = null;
try {
in = file.getInputStream();
Workbook wb = null;
if (suffix.equals("xlsx")) {
wb = new XSSFWorkbook(in);
} else {
wb = new HSSFWorkbook(in);
}
//获取excel第一页
Sheet sheet = wb.getSheetAt(0);
List<PhoneBill> saveList = new ArrayList<PhoneBill>();
if (!ObjectUtils.isEmpty(sheet)) {
for (int line = 1, maxline = sheet.getLastRowNum(); line <= maxline; line++) {
Row row = sheet.getRow(line);
if (null == row) {
continue;
}
//获取每行的指定列数据【电话号,日期,金额】
Cell cell2 = row.getCell(2);
cell2.setCellType(CellType.STRING);
String phone = cell2.getStringCellValue();
Cell cell4 = row.getCell(4);
cell4.setCellType(CellType.NUMERIC);
LocalDateTime payDate = DateUtil.getLocalDateTime(cell4.getDateCellValue());
Cell cell6 = row.getCell(6);
cell6.setCellType(CellType.STRING);
BigDecimal cost = BigDecimal.valueOf(Double.parseDouble(cell6.getStringCellValue()));
PhoneBill phoneBill = null;
if (ObjectUtils.isEmpty(phoneBill)) {
phoneBill = new PhoneBill();
phoneBill.setPhone(phone);
phoneBill.setPayDate(payDate);
phoneBill.setCost(cost);
phoneBill.setCreatedAt(LocalDateTime.now());
phoneBill.setUpdatedAt(LocalDateTime.now());
saveList.add(phoneBill);
}
}
long begin = System.currentTimeMillis();
Integer compute = 0;
try {
ForkJoinInsert fjc = new ForkJoinInsert(0, saveList.size(), saveList);
compute = fjc.compute();
} catch (Exception ex) {
ex.printStackTrace();
throw new PhoneException(500, "批量新增出错");
}
long end = System.currentTimeMillis();
System.out.println("总时间:" + (end - begin) / 1000F);
System.out.println("总长度" + saveList.size());
System.out.println("总次数" + compute);
}
} catch (IOException ex) {
throw new PhoneException(500, "导入失败");
} finally {
try {
in.close();
} catch (Exception ex) {
throw new PhoneException(500, "导入失败");
}
}
}
其中一个任务处理失败了其他的任务插入的数据并没有回滚
请问下老师如何控制一个子任务出错所有数据都回滚呢
写回答
1回答
-
Jimin
2020-01-13
你好,这个已经是多线程执行了,每个线程的执行是分开的,不具备普通事务的能力。如果硬要回滚,只能尝试引入分布式事务了,但并不一定真的能解决问题。我的建议是,你重新调整一下设计,处理好可能出现的问题,正常这个框架不该存在需要回滚的操作。其实回滚这种操作在分布式环境下很难,除非是一个线程多次操作一个db这种场景,稍微复杂一点回滚就会失效。
012020-01-14
相似问题