Java NIO

nio Support

NIO 就是非阻塞 IO,Java NIO 相关的类都在java.nio包下。主要包含:

alt text

Channels

Channels 非阻塞读写的通道,实际上是文件或 socket 连接(文件描述符)。

以 SocketChannelImpl 为例(Channel 接口是所有通道的祖先接口),主要结构如下

alt text

Buffers

Buffers 缓冲区数组,可以直接被 Channels 读写,以 ByteBuffer 为例,可以使用堆内存 HeapByteBuffer,也可以使用直接内存 DirectByteBuffer

alt text

Selectors

Selectors IO 多路复用的查询器,可以查询出哪些 Channel 有 IO 事件 alt text

SelectionKeys

SelectionKeys 维护 IO 事件的状态(连接建立、可读、可写),绑定相关的 Selector 和 Channel,以及附加操作 attachment

Java NIO 实现 Reactor

以下使用 Doug Lea 老爷子的 Scalable IO in Java [1^] 演讲搞中的例子,来使用 JAVA NIO 实现单线程的 reactor 模型。

alt text

Set up

 1public class Reactor {
 2    final Selector selector;
 3    final ServerSocketChannel serverSocket;
 4
 5    /**
 6     * 1. Set up
 7     */
 8    public Reactor(int port) throws IOException {
 9        // SelectorProvider provider = SelectorProvider.provider();
10        // AbstractSelector selector = provider.openSelector();
11        // ServerSocketChannel serverSocket = provider.openServerSocketChannel();
12
13        selector = Selector.open();
14        serverSocket = ServerSocketChannel.open();
15        serverSocket.configureBlocking(false);
16        SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
17        selectionKey.attach(new Acceptor());
18    }
19}

初始化主要是创建查询器和服务端 Socket,主要包含以下操作:

  • 打开一个事件查询器 Selector

  • 打开一个服务端 Socket 通道,用来监听客户端的 TCP 连接

  • 在服务端 Socket 通道上注册 Selector,查询OP_ACCEPT事件

  • 在对应 SelectionKey 上绑定 Acceptor, 处理建立的连接

Dispatch Loop

 1    // class Reactor continued
 2
 3    /**
 4     * 2. Dispatch Loop
 5     */
 6    @Override
 7    public void run() {
 8        // normally in a new Thread
 9        try {
10            while (!Thread.interrupted()) {
11                // blocking, util at least one channel is selected
12                selector.select();
13                Set<SelectionKey> selected = selector.selectedKeys();
14                Iterator<SelectionKey> it = selected.iterator();
15                while (it.hasNext()) {
16                    dispatch(it.next());
17                    selected.clear();
18                }
19            }
20        } catch (IOException e) {
21            // handle ex
22        }
23    }
24
25    private void dispatch(SelectionKey key) {
26        Runnable runnable = (Runnable) key.attachment();
27        if (runnable != null) {
28            runnable.run();
29        }
30    }

这一部主要是持续监听客户端的连接,即有新的 OP_ACCEPT 事件后,就运行对应 SelectionKey 中绑定的处理逻辑。

Acceptor

 1    // class Reactor continued
 2    
 3    /**
 4     * 3. Acceptor
 5     */
 6    public class Acceptor implements Runnable {
 7        @Override
 8        public void run() {
 9            try {
10                SocketChannel channel = serverSocket.accept();
11                if (channel != null) {
12                    new Handler(selector, channel);
13                }
14            } catch (IOException e) {
15                // handle ex
16            }
17        }
18    }

这一步就是服务端同意客户端发起的连接建立请求,并产生一个服务端和客户端之间的 SocketChannel,然后分配一个 Handler 去专门处理这个连接上读写操作。

Handler Setup

 1public class Handler implements Runnable {
 2    private static final int MAX_IN = 5 * 1024;
 3    private static final int MAX_OUT = 5 * 1024;
 4
 5    final SocketChannel socket;
 6    final SelectionKey selectionKey;
 7    ByteBuffer input = ByteBuffer.allocate(MAX_IN);
 8    ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
 9    static final int READING = 0, SENDING = 1;
10    int state = READING;
11
12
13    public Handler(Selector selector, SocketChannel socketChannel) throws IOException {
14        socket = socketChannel;
15        socket.configureBlocking(false);
16        // Optionally try first read now
17        selectionKey = socket.register(selector, 0);
18        selectionKey.attach(this);
19        selectionKey.interestOps(SelectionKey.OP_READ);
20        selector.wakeup();
21    }
22
23    boolean inputIsComplete() { /* ... */ }
24    boolean outputIsComplete() { /* ... */ }
25    void process() { /* ... */ }
26}

Request handling

 1// class Handler continued 
 2    @Override
 3    public void run() {
 4        try {
 5            if (state == READING) {
 6                read();
 7            }
 8            else if (state == SENDING) {
 9                send();
10            }
11        } catch (IOException ex) {
12            /* ... */
13        }
14    }
15
16    void read() throws IOException {
17        socket.read(input);
18        if (inputIsComplete()) {
19            process();
20            state = SENDING;
21            // Normally also do first write now
22            selectionKey.interestOps(SelectionKey.OP_WRITE);
23        }
24    }
25
26    void send() throws IOException {
27        socket.write(output);
28        if (outputIsComplete()) sk.cancel();
29    }