老师我这段消费concurrentLinkedQueue的代码有什么地方需要改进么?

来源:6-13 线程池饱和策略之终止策略

夏目鲸鱼

2021-01-22

import lombok.Data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Data
public class Multithreading {
    private List<String> productListLack = new ArrayList<>();
    private List<String> productList;
    private int threadsNum = 10; // 线程数量


    public Multithreading(List<String> productList) {
        this.productList = productList;
    }

    public void scheduling() throws InterruptedException {
        /**
         * List<String>productList to iterate </String>
         */
        Multithreading mt = new Multithreading(this.productList);
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        CountDownLatch count = new CountDownLatch(this.threadsNum);//计数器

        //将product放入队列(product进入队列等待被处理)
        for(String product: this.productList) {
            queue.offer(product);
        }

        //执行${threadsNum}个线程从队列取出元素(${threadsNum}个线程开始消费)
        System.out.println("-----------------------------------开始线程池填充-----------------------------------");
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(this.threadsNum);
        for(int i=0;i<this.threadsNum;i++) {
            executorService.submit(new productDetailCheck("00" + (i+1),  queue, count, this));
        }
        //计数器等待,直到队列为空(所有的product code都通过Http Request进行了请求处理)
        count.await();
        long time = System.currentTimeMillis() - start;
        System.out.println("-----------------------------------所有的product都已经被消费完毕-----------------------------------");
        System.out.println("共耗时:" + time);
        //停止线程池
        executorService.shutdown();
        System.out.println(String.format("product list lack of info: %s", this.productListLack));
    }

    private static class productDetailCheck implements Runnable{
        private String name;
        private ConcurrentLinkedQueue<String> queue;
        private CountDownLatch count;
        private Multithreading mt;

        public productDetailCheck(String name, ConcurrentLinkedQueue<String> queue, CountDownLatch count, Multithreading mt) {
            this.name = name;
            this.queue = queue;
            this.count = count;
            this.mt = mt;
        }

        @Override
        public void run() {
            while (!queue.isEmpty()){
                try{
                    //从队列取出一个元素 排队的product少一个
                    String productCode = queue.poll();
                    System.out.println("【" +productCode + "】----开始处理..., 所在的线程编号:" + name);
                    CategoryAPI categoryAPI = new CategoryAPI();
                    Reaction reaction = categoryAPI.productDetail(productCode);
                    reaction.ok();
                    System.out.println(reaction.toString());
                    //check if lack of info
                    checkIfLackOfInfo(reaction, productCode, mt);

                    Thread.sleep(100);
                    System.out.println("【" +productCode + "】----已处理完..., 所在的线程编号:" + name);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            count.countDown();//计数器-1
        }

        public void checkIfLackOfInfo(Reaction reaction, String productCode, Multithreading mt) {
            Map<String, Object> dataMapper = reaction.getFirstMapInList();
            List<String> fields = Arrays.asList("name", "code", "color", "unit", "imgUrl", "price", "type", "typeName");
            Boolean hasEmpty = false;
            for(String field: fields) {
                Object fieldValue = dataMapper.get(field);
                if(fieldValue == null) {
                    hasEmpty = true;
                    System.out.println(String.format("%s 's field %s is empty", productCode, field));
                }
            }
            if(hasEmpty) {
                mt.productListLack.add(productCode);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        /**
         * List<String>productList to iterate </String>
         */
        List<String> productList = new ArrayList<>();

        productList.add("20468353");
        productList.add("10469186");
        productList.add("10336243");
        productList.add("50353018");
        productList.add("70335486");
        Multithreading mt = new Multithreading(productList);
        mt.scheduling();
    }




}

写回答

1回答

张小喜

2021-02-17

没看明白你引入CountDownLatch是想做什么。

0
0

Java高效编程技巧实践 告别996

可以改变的编程效率

1451 学习 · 326 问题

查看课程