nio 监听与数据处理并行的线程模式设计,自己写了一段代码,接收不到客户端消息
来源:7-10 NIO服务器Thread优化-4
蓝胖子的编程梦
2019-07-05
学到第七章,真的吃力起来了,尝试写个简单点的demo出了问题不止如何排查。听了老师对线程优化的解读,我理解到的就是仍然是一个线程open一个监听的selector监听新连接的到来,然后一个线程open一个读selector监听读,一个线程open一个写selector监听写,具体读和写的操作,扔给线程池去做。然后下面是我代码。
server
public class TestServerMain {
public static void main(String[] args) {
try {
//监听连接到达selector
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(38080));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
IoSelectorProvider ioSelectorProvider = IoSelectorProvider.getInstance();
ioSelectorProvider.startRead();
ClientListener listener = new ClientListener(selector);
listener.start();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 客户端监听者
*/
private static class ClientListener extends Thread {
private Selector selector;
private boolean done;
public ClientListener(Selector selector) {
this.selector = selector;
}
public void exit() {
this.done = true;
selector.wakeup();
}
@Override
public void run() {
try {
while (!done) {
int num = selector.select();
if (num == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = server.accept();
System.out.println("客户端连接进来:" + socketChannel.getLocalAddress());
//注入输入
ReadHandler readHandler = new ReadHandler();
readHandler.registerInput(socketChannel);
}
iterator.remove();
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
CloseUtil.closeAll(selector);
}
}
}
/**
* 客户端读处理器
*/
//一个provider里有两个selector以及两个线程池,监听读单独开一个线程,selector监听到读的以后,把读的操作交给线程池去做。
private static class ReadHandler {
public void registerInput(SocketChannel socketChannel) throws IOException, InterruptedException {
IoSelectorProvider.getInstance().registerInput(socketChannel);
}
}
}
ioprovider
这里provider中 阻塞状态的selector注册channel会失败,得先唤醒,为了避免唤醒后没来得及注册,又进入select()状态,我用了Semaphore ,感觉没问题。
public class IoSelectorProvider {
private static IoSelectorProvider ioSelectorProvider;
//todo 使用原因
private AtomicBoolean isClosed = new AtomicBoolean(false);
private Semaphore semaphore = new Semaphore(1);
private Selector readSelector;
private ExecutorService readExecutorService;
private IoSelectorProvider() throws IOException {
this.readSelector = Selector.open();
this.readExecutorService = Executors.newFixedThreadPool(4);
}
public static IoSelectorProvider getInstance() throws IOException {
synchronized (IoSelectorProvider.class) {
if (ioSelectorProvider == null)
ioSelectorProvider = new IoSelectorProvider();
}
return ioSelectorProvider;
}
/**
* 注册读channel
*
* @param socketChannel
* @throws IOException
*/
public void registerInput(SocketChannel socketChannel) throws IOException, InterruptedException {
//todo 阻塞状态的selector注册channel会失败,得先唤醒
socketChannel.configureBlocking(false);
semaphore.acquire();
readSelector.wakeup();
socketChannel.register(readSelector, SelectionKey.OP_READ);
semaphore.release();
}
public void startRead() {
//监听读
Thread thread = new Thread(() -> {
try {
while (!isClosed.get()) {
//todo 如果关闭以后,是不是锁就不能释放了
semaphore.release();
if (readSelector.select() == 0) {
semaphore.acquire();
continue;
}
for (SelectionKey selectionKey : readSelector.selectedKeys()) {
if (selectionKey.isValid()) {
//交由线程池异步处理
handelReadSelection(selectionKey, readExecutorService);
}
}
readSelector.selectedKeys().clear();
}
} catch (IOException | InterruptedException e) {
//todo 异常不处理的原因
e.printStackTrace();
}
});
thread.start();
}
/**
* 处理读过程
*
* @param selectionKey
* @param readExecutorService
*/
private void handelReadSelection(SelectionKey selectionKey, ExecutorService readExecutorService) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// todo 取消对key得监听
readExecutorService.execute(new ReadRunnable(socketChannel));
}
private class ReadRunnable implements Runnable {
private SocketChannel socketChannel;
public ReadRunnable(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
int readNum = socketChannel.read(byteBuffer);
if (readNum > 0)
System.out.println("接受到客户端消息:" + new String(byteBuffer.array(), 0, readNum));
else
throw new IOException("can not read any data");
} catch (IOException e) {
//e.printStackTrace();
CloseUtil.closeAll(socketChannel);
}
}
}
}
client
public class TestClientMain {
public static void main(String[] args) throws InterruptedException, IOException {
Socket client = new Socket();
client.bind(new InetSocketAddress(38081));
client.connect(new InetSocketAddress(38080));
PrintStream printStream = new PrintStream(client.getOutputStream());
for (int i = 0; i < 100; i++) {
printStream.print(i);
// TimeUnit.MILLISECONDS.sleep(1);
}
Thread.currentThread().join();
}
}
老师真的太厉害了,第七章对我启发很大,也让我理解了nio这个东西,以前这块儿一直看不懂。
写回答
1回答
-
nio其实官方的讲解非常的少,大部分都是靠各个企业去摸索。而且目前来说NIO的优化并没有一个固定的方向,你看到后面12章节就能知道我们又把NIO的调度给改了;但是这也并不能说就是最好的,说不定还有更好的更优秀的方案。
只能说找到一个适合自己项目的方案,而没有绝对的万能方案。
NIO的思路也是值得借鉴的,建议代码可以多多看看,我看你的代码有一些地方自己进行了改动,这是非常棒的;自己有想法也有能力去改进我的代码这是非常优秀的。这也也能让你学到更多,证明了你开始在积累自己的知识,而不仅仅把知识放在我这里了。
给你点赞~~如果后续有啥问题随时提问,或者QQ找我哈。
032019-07-09
相似问题