nio 监听与数据处理并行的线程模式设计,自己写了一段代码,接收不到客户端消息

来源:7-10 NIO服务器Thread优化-4

蓝胖子的编程梦

2019-07-05

学到第七章,真的吃力起来了,尝试写个简单点的demo出了问题不止如何排查。听了老师对线程优化的解读,我理解到的就是仍然是一个线程open一个监听的selector监听新连接的到来,然后一个线程open一个读selector监听读,一个线程open一个写selector监听写,具体读和写的操作,扔给线程池去做。然后下面是我代码。
server

public class TestServerMain {

    public static void main(String[] args) {
        try {
            //监听连接到达selector
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(38080));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            IoSelectorProvider ioSelectorProvider = IoSelectorProvider.getInstance();
            ioSelectorProvider.startRead();
            ClientListener listener = new ClientListener(selector);
            listener.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 客户端监听者
     */
    private static class ClientListener extends Thread {
        private Selector selector;

        private boolean done;

        public ClientListener(Selector selector) {
            this.selector = selector;
        }

        public void exit() {
            this.done = true;
            selector.wakeup();
        }

        @Override
        public void run() {
            try {
                while (!done) {
                    int num = selector.select();
                    if (num == 0) {
                        continue;
                    }
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        if (selectionKey.isAcceptable()) {
                            ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                            SocketChannel socketChannel = server.accept();
                            System.out.println("客户端连接进来:" + socketChannel.getLocalAddress());
                            //注入输入
                            ReadHandler readHandler = new ReadHandler();
                            readHandler.registerInput(socketChannel);
                        }
                        iterator.remove();
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            } finally {
                CloseUtil.closeAll(selector);
            }
        }
    }

    /**
     * 客户端读处理器
     */
    //一个provider里有两个selector以及两个线程池,监听读单独开一个线程,selector监听到读的以后,把读的操作交给线程池去做。
    private static class ReadHandler {

        public void registerInput(SocketChannel socketChannel) throws IOException, InterruptedException {
            IoSelectorProvider.getInstance().registerInput(socketChannel);
        }

    }

}

ioprovider

这里provider中 阻塞状态的selector注册channel会失败,得先唤醒,为了避免唤醒后没来得及注册,又进入select()状态,我用了Semaphore ,感觉没问题。

public class IoSelectorProvider {

    private static IoSelectorProvider ioSelectorProvider;

    //todo   使用原因
    private AtomicBoolean isClosed = new AtomicBoolean(false);

    private Semaphore semaphore = new Semaphore(1);

    private Selector readSelector;

    private ExecutorService readExecutorService;

    private IoSelectorProvider() throws IOException {
        this.readSelector = Selector.open();
        this.readExecutorService = Executors.newFixedThreadPool(4);
    }

    public static IoSelectorProvider getInstance() throws IOException {
        synchronized (IoSelectorProvider.class) {
            if (ioSelectorProvider == null)
                ioSelectorProvider = new IoSelectorProvider();
        }
        return ioSelectorProvider;
    }

    /**
     * 注册读channel
     *
     * @param socketChannel
     * @throws IOException
     */
    public void registerInput(SocketChannel socketChannel) throws IOException, InterruptedException {
        //todo 阻塞状态的selector注册channel会失败,得先唤醒
        socketChannel.configureBlocking(false);
        semaphore.acquire();
        readSelector.wakeup();
        socketChannel.register(readSelector, SelectionKey.OP_READ);
        semaphore.release();
    }

    public void startRead() {
        //监听读
        Thread thread = new Thread(() -> {
            try {
                while (!isClosed.get()) {
                    //todo 如果关闭以后,是不是锁就不能释放了
                    semaphore.release();
                    if (readSelector.select() == 0) {
                        semaphore.acquire();
                        continue;
                    }
                    for (SelectionKey selectionKey : readSelector.selectedKeys()) {
                        if (selectionKey.isValid()) {
                            //交由线程池异步处理
                            handelReadSelection(selectionKey, readExecutorService);
                        }
                    }
                    readSelector.selectedKeys().clear();
                }
            } catch (IOException | InterruptedException e) {
                //todo  异常不处理的原因
                e.printStackTrace();
            }
        });
        thread.start();
    }

    /**
     * 处理读过程
     *
     * @param selectionKey
     * @param readExecutorService
     */
    private void handelReadSelection(SelectionKey selectionKey, ExecutorService readExecutorService) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        // todo 取消对key得监听
        readExecutorService.execute(new ReadRunnable(socketChannel));
    }


    private class ReadRunnable implements Runnable {
        private SocketChannel socketChannel;

        public ReadRunnable(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        @Override
        public void run() {
            try {
                ByteBuffer byteBuffer = ByteBuffer.allocate(512);
                int readNum = socketChannel.read(byteBuffer);
                if (readNum > 0)
                    System.out.println("接受到客户端消息:" + new String(byteBuffer.array(), 0, readNum));
                else
                    throw new IOException("can not read any data");
            } catch (IOException e) {
                //e.printStackTrace();
                CloseUtil.closeAll(socketChannel);
            }
        }
    }

}

client

public class TestClientMain {
    public static void main(String[] args) throws InterruptedException, IOException {
        Socket client = new Socket();
        client.bind(new InetSocketAddress(38081));
        client.connect(new InetSocketAddress(38080));
        PrintStream printStream = new PrintStream(client.getOutputStream());
        for (int i = 0; i < 100; i++) {
            printStream.print(i);
//            TimeUnit.MILLISECONDS.sleep(1);
        }
        Thread.currentThread().join();

    }
}

老师真的太厉害了,第七章对我启发很大,也让我理解了nio这个东西,以前这块儿一直看不懂。

写回答

1回答

Qiujuer

2019-07-05

nio其实官方的讲解非常的少,大部分都是靠各个企业去摸索。而且目前来说NIO的优化并没有一个固定的方向,你看到后面12章节就能知道我们又把NIO的调度给改了;但是这也并不能说就是最好的,说不定还有更好的更优秀的方案。

只能说找到一个适合自己项目的方案,而没有绝对的万能方案。

NIO的思路也是值得借鉴的,建议代码可以多多看看,我看你的代码有一些地方自己进行了改动,这是非常棒的;自己有想法也有能力去改进我的代码这是非常优秀的。这也也能让你学到更多,证明了你开始在积累自己的知识,而不仅仅把知识放在我这里了。

给你点赞~~如果后续有啥问题随时提问,或者QQ找我哈。

0
3
蓝胖子的编程梦
非常感谢!
2019-07-09
共3条回复

Socket网络编程进阶与实战 系统掌握Socket核心技术

理论+实践,系统且深入掌握Socket核心技术,从容应对各种Socket应用场景的不二之选

2316 学习 · 476 问题

查看课程