老师我这段消费concurrentLinkedQueue的代码有什么地方需要改进么?
来源:6-13 线程池饱和策略之终止策略

夏目鲸鱼
2021-01-22
import lombok.Data;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Data
public class Multithreading {
private List<String> productListLack = new ArrayList<>();
private List<String> productList;
private int threadsNum = 10; // 线程数量
public Multithreading(List<String> productList) {
this.productList = productList;
}
public void scheduling() throws InterruptedException {
/**
* List<String>productList to iterate </String>
*/
Multithreading mt = new Multithreading(this.productList);
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
CountDownLatch count = new CountDownLatch(this.threadsNum);//计数器
//将product放入队列(product进入队列等待被处理)
for(String product: this.productList) {
queue.offer(product);
}
//执行${threadsNum}个线程从队列取出元素(${threadsNum}个线程开始消费)
System.out.println("-----------------------------------开始线程池填充-----------------------------------");
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(this.threadsNum);
for(int i=0;i<this.threadsNum;i++) {
executorService.submit(new productDetailCheck("00" + (i+1), queue, count, this));
}
//计数器等待,直到队列为空(所有的product code都通过Http Request进行了请求处理)
count.await();
long time = System.currentTimeMillis() - start;
System.out.println("-----------------------------------所有的product都已经被消费完毕-----------------------------------");
System.out.println("共耗时:" + time);
//停止线程池
executorService.shutdown();
System.out.println(String.format("product list lack of info: %s", this.productListLack));
}
private static class productDetailCheck implements Runnable{
private String name;
private ConcurrentLinkedQueue<String> queue;
private CountDownLatch count;
private Multithreading mt;
public productDetailCheck(String name, ConcurrentLinkedQueue<String> queue, CountDownLatch count, Multithreading mt) {
this.name = name;
this.queue = queue;
this.count = count;
this.mt = mt;
}
@Override
public void run() {
while (!queue.isEmpty()){
try{
//从队列取出一个元素 排队的product少一个
String productCode = queue.poll();
System.out.println("【" +productCode + "】----开始处理..., 所在的线程编号:" + name);
CategoryAPI categoryAPI = new CategoryAPI();
Reaction reaction = categoryAPI.productDetail(productCode);
reaction.ok();
System.out.println(reaction.toString());
//check if lack of info
checkIfLackOfInfo(reaction, productCode, mt);
Thread.sleep(100);
System.out.println("【" +productCode + "】----已处理完..., 所在的线程编号:" + name);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
count.countDown();//计数器-1
}
public void checkIfLackOfInfo(Reaction reaction, String productCode, Multithreading mt) {
Map<String, Object> dataMapper = reaction.getFirstMapInList();
List<String> fields = Arrays.asList("name", "code", "color", "unit", "imgUrl", "price", "type", "typeName");
Boolean hasEmpty = false;
for(String field: fields) {
Object fieldValue = dataMapper.get(field);
if(fieldValue == null) {
hasEmpty = true;
System.out.println(String.format("%s 's field %s is empty", productCode, field));
}
}
if(hasEmpty) {
mt.productListLack.add(productCode);
}
}
}
public static void main(String[] args) throws InterruptedException {
/**
* List<String>productList to iterate </String>
*/
List<String> productList = new ArrayList<>();
productList.add("20468353");
productList.add("10469186");
productList.add("10336243");
productList.add("50353018");
productList.add("70335486");
Multithreading mt = new Multithreading(productList);
mt.scheduling();
}
}
写回答
1回答
-
没看明白你引入CountDownLatch是想做什么。
00
相似问题