Java NIO¶
nio Support¶
NIO 就是非阻塞 IO,Java NIO 相关的类都在java.nio
包下。主要包含:
Channels¶
Channels 非阻塞读写的通道,实际上是文件或 socket 连接(文件描述符)。
以 SocketChannelImpl 为例(Channel 接口是所有通道的祖先接口),主要结构如下
Buffers¶
Buffers 缓冲区数组,可以直接被 Channels 读写,以 ByteBuffer
为例,可以使用堆内存 HeapByteBuffer
,也可以使用直接内存 DirectByteBuffer
Selectors¶
Selectors IO 多路复用的查询器,可以查询出哪些 Channel 有 IO 事件
SelectionKeys¶
SelectionKeys 维护 IO 事件的状态(连接建立、可读、可写),绑定相关的 Selector 和 Channel,以及附加操作 attachment
Java NIO 实现 Reactor¶
以下使用 Doug Lea 老爷子的 Scalable IO in Java [1^] 演讲搞中的例子,来使用 JAVA NIO 实现单线程的 reactor 模型。
Set up¶
1public class Reactor {
2 final Selector selector;
3 final ServerSocketChannel serverSocket;
4
5 /**
6 * 1. Set up
7 */
8 public Reactor(int port) throws IOException {
9 // SelectorProvider provider = SelectorProvider.provider();
10 // AbstractSelector selector = provider.openSelector();
11 // ServerSocketChannel serverSocket = provider.openServerSocketChannel();
12
13 selector = Selector.open();
14 serverSocket = ServerSocketChannel.open();
15 serverSocket.configureBlocking(false);
16 SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
17 selectionKey.attach(new Acceptor());
18 }
19}
初始化主要是创建查询器和服务端 Socket,主要包含以下操作:
打开一个事件查询器 Selector
打开一个服务端 Socket 通道,用来监听客户端的 TCP 连接
在服务端 Socket 通道上注册 Selector,查询
OP_ACCEPT
事件在对应 SelectionKey 上绑定 Acceptor, 处理建立的连接
Dispatch Loop¶
1 // class Reactor continued
2
3 /**
4 * 2. Dispatch Loop
5 */
6 @Override
7 public void run() {
8 // normally in a new Thread
9 try {
10 while (!Thread.interrupted()) {
11 // blocking, util at least one channel is selected
12 selector.select();
13 Set<SelectionKey> selected = selector.selectedKeys();
14 Iterator<SelectionKey> it = selected.iterator();
15 while (it.hasNext()) {
16 dispatch(it.next());
17 selected.clear();
18 }
19 }
20 } catch (IOException e) {
21 // handle ex
22 }
23 }
24
25 private void dispatch(SelectionKey key) {
26 Runnable runnable = (Runnable) key.attachment();
27 if (runnable != null) {
28 runnable.run();
29 }
30 }
这一部主要是持续监听客户端的连接,即有新的 OP_ACCEPT
事件后,就运行对应 SelectionKey 中绑定的处理逻辑。
Acceptor¶
1 // class Reactor continued
2
3 /**
4 * 3. Acceptor
5 */
6 public class Acceptor implements Runnable {
7 @Override
8 public void run() {
9 try {
10 SocketChannel channel = serverSocket.accept();
11 if (channel != null) {
12 new Handler(selector, channel);
13 }
14 } catch (IOException e) {
15 // handle ex
16 }
17 }
18 }
这一步就是服务端同意客户端发起的连接建立请求,并产生一个服务端和客户端之间的 SocketChannel,然后分配一个 Handler 去专门处理这个连接上读写操作。
Handler Setup¶
1public class Handler implements Runnable {
2 private static final int MAX_IN = 5 * 1024;
3 private static final int MAX_OUT = 5 * 1024;
4
5 final SocketChannel socket;
6 final SelectionKey selectionKey;
7 ByteBuffer input = ByteBuffer.allocate(MAX_IN);
8 ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
9 static final int READING = 0, SENDING = 1;
10 int state = READING;
11
12
13 public Handler(Selector selector, SocketChannel socketChannel) throws IOException {
14 socket = socketChannel;
15 socket.configureBlocking(false);
16 // Optionally try first read now
17 selectionKey = socket.register(selector, 0);
18 selectionKey.attach(this);
19 selectionKey.interestOps(SelectionKey.OP_READ);
20 selector.wakeup();
21 }
22
23 boolean inputIsComplete() { /* ... */ }
24 boolean outputIsComplete() { /* ... */ }
25 void process() { /* ... */ }
26}
Request handling¶
1// class Handler continued
2 @Override
3 public void run() {
4 try {
5 if (state == READING) {
6 read();
7 }
8 else if (state == SENDING) {
9 send();
10 }
11 } catch (IOException ex) {
12 /* ... */
13 }
14 }
15
16 void read() throws IOException {
17 socket.read(input);
18 if (inputIsComplete()) {
19 process();
20 state = SENDING;
21 // Normally also do first write now
22 selectionKey.interestOps(SelectionKey.OP_WRITE);
23 }
24 }
25
26 void send() throws IOException {
27 socket.write(output);
28 if (outputIsComplete()) sk.cancel();
29 }