向 读写Selector 注册客户端 channel 读写事件时,读写Selector 的 select() 一直为 0 的问题

来源:9-10 消息分片传输设计与实现-2

王二小的宠物狗

2019-03-06

问题描述

当启动 服务端客户端 连接成功,并向 读写selector 注册 客户端 channel 的读写事件时,读写线程无限循环检测 读写selector select() 阻塞操作 是否有就绪的事件时,select() 一直返回 0 的问题。

正常流程:

正常流程当客户端连接成功,并在 2s 后向 writeSelector 注册写事件时,IO 类中的 startWrite() 方法肯定会执行这句 System.out.println("有就绪的写事件~~~~~~~~~~~~~"); 打印,但是在下面的代码中是不执行的。

代码

服务端

Server.java

/**
 *
 */
public class Server {
    public static void main(String[] args) {
        try {
            Server server = new Server();
            server.start(8899);
        } catch (IOException e) {
            System.err.println("ERROR:" + e);
        }
    }

    private final Selector selector;
    private final ServerSocketChannel server;
    private ClientListener listener;

    public Server() throws IOException {
        selector = Selector.open();
        server = ServerSocketChannel.open();
        server.configureBlocking(false);
    }

    public void start(int port) throws IOException {
        server.bind(new InetSocketAddress(port));
        server.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务端信息:" + server.getLocalAddress().toString());

        listener = new ClientListener(selector);
        listener.start();
    }
}

ClientListener.java

public class ClientListener extends Thread {
    private boolean done = false;
    private final Selector selector;

    public ClientListener(Selector selector) {
        this.selector = selector;
    }

    @Override
    public void run() {
        System.out.println("服务端准备就绪~~");
        do {
            try {
                if(selector.select() == 0) {
                    if(done) {
                        break;
                    }
                    continue;
                }

                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();

                    if(key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel channel = server.accept();
                        if(channel != null) {
                            System.out.println("有新客户端连接....");
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } while (!done);
    }
}

客户端

Client.java

public class Client {
    public static void main(String[] args) {
        Client client = null;
        try {
            client = new Client();
            client.connect("127.0.0.1", 8899);
        } catch (IOException e) {
            System.err.println("ERROR:" + e);
        }

        if(client != null) {
            try {
                Thread.sleep(2000);
                client.registerWrite();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private final SocketChannel channel;
    private IO io;

    public Client() throws IOException {
        channel = SocketChannel.open();
        channel.configureBlocking(false);
    }

    public void connect(String host, int port) throws IOException {
        channel.connect(new InetSocketAddress(host, port));
        System.out.println("已发起服务端连接.");

        io = new IO();
    }

    /**
     * 测试注册写事件
     */
    public void registerWrite() {
        try {
            System.out.println("注册写事件开始.");
            SelectionKey key = io.registerWrite(channel);
            System.out.println("注册写事件完成. key:" + key);
        } catch (ClosedChannelException e) {
            System.err.println("注册写事件失败!! " + e);
        }
    }
}

IO 操作

IO.java

public final class IO {
    private final AtomicBoolean isClosed = new AtomicBoolean();

    private final Selector readSelector;
    private final Selector writeSelector;

    public IO() throws IOException {
        readSelector = Selector.open();
        writeSelector = Selector.open();
        
        startRead();
        startWrite();
    }

    private void startRead() {
        Thread thread = new Thread("") {

            @Override
            public void run() {
                while (!isClosed.get()) {
                    try {
                        if(readSelector.select() == 0) {
                            continue;
                        }

                        Iterator<SelectionKey> iterator = readSelector.selectedKeys().iterator();
                        while(iterator.hasNext()) {
                            SelectionKey key = iterator.next();
                            iterator.remove();

                            if(key.isValid()) {
                                System.out.println("处理读事件~~~~~~~~~~~~~ key:" + key);
                            }
                        }
                    } catch (IOException e) {
                        System.out.println("处理读事件失败!! " + e);
                    }
                }
            }
        };
        thread.setPriority(Thread.MAX_PRIORITY);
        thread.start();
    }

    private void startWrite() {
        Thread thread = new Thread("") {

            @Override
            public void run() {
                while (!isClosed.get()) {
                    try {
                        if(writeSelector.select() == 0) {
                            continue;
                        }
                        System.out.println("有就绪的写事件~~~~~~~~~~~~~");

                        Iterator<SelectionKey> iterator = writeSelector.selectedKeys().iterator();
                        while(iterator.hasNext()) {
                            SelectionKey key = iterator.next();
                            iterator.remove();

                            if(key.isValid()) {
                                System.out.println("处理写事件~~~~~~~~~~~~~ key:" + key);
                            }
                        }
                    } catch (IOException e) {
                        System.out.println("处理写事件失败!! " + e);
                    }
                }
            }
        };
        thread.setPriority(Thread.MAX_PRIORITY);
        thread.start();
    }

    public SelectionKey registerWrite(SocketChannel channel) throws ClosedChannelException {
        System.out.println("注册写事件进行中......");
        writeSelector.wakeup();
        return channel.register(writeSelector, SelectionKey.OP_WRITE);
    }
}
写回答

1回答

Qiujuer

2019-03-06

你那个问题 我从昨晚找到现在终于解决了。

太隐蔽的BUG了。

首先有一个地方的错误,不是你提及的错误,是死锁问题:

你唤醒的时候因为没有使用另外的实例来进行同步,在课程中有一个Bool实例用来做同步,所以一定几率会有死锁的情况;当然这个跟你的问题无关。

//img.mukewang.com/szimg/5c7f9c83000146ac15301296.jpg

我给加了2000的延迟,为的是下面唤醒后不立刻进入到select状态,如果立刻进入了,就会导致下面的注册出现死锁的情况。

回到你的问题,尝试将注册的selector更换、真实的写数据、调试跟踪到selector内部等等都没有发现问题。

唯一发现的是selector内部的确已经注册了Key了,可以通过调试看见Key是正常的;所以一段时间内我都没有发现问题所在,一直会循环打印1111111也就是select返回0,而不是阻塞。

另外我也看了selector内部的管道机制也是没有问题的,唯一有问题的就是管道不阻塞了,一直直接返回了;这点很奇怪。

直到我再次跟踪调试的时候才发现了这个问题:

//img.mukewang.com/szimg/5c7f9cd400010b8b28801800.jpg

发现Socket连接的状态不对:java.nio.channels.SocketChannel[connection-pending remote=/127.0.0.1:58899]。是connection-pending状态。

然后我才发现你在连接前设置了不阻塞,所以导致的这个问题。

//img.mukewang.com/szimg/5c7f9cf90001bfcd11980474.jpg

解决方法2个:

  1. 要么连接前去掉非阻塞,链接后再设置非阻塞模式

  2. 也可以循环调用finishConnect方法直到返回true时即可

简单来说这个链接并为完成真正的3次握手~~~所以导致了对应注册的selector一直返回0。

以上就是我解决这个问题的简单流程。

多谢购买课程,祝学习愉快~


3
3
Qiujuer
回复
qq_生命的宣言_eBFGIP
finishConnect 在完全建立链接后返回true,是一个状态检查,不会进行等待。
2019-12-13
共3条回复

Socket网络编程进阶与实战 系统掌握Socket核心技术

理论+实践,系统且深入掌握Socket核心技术,从容应对各种Socket应用场景的不二之选

2324 学习 · 476 问题

查看课程