NIO

NIO

一. I/O模型

1.1 同步异步

理解这些概念要结合相应的语境,一般主要在进程通信I/O系统调用方面讨论这些概念。

进程通信:进程间通信由 send()receive() 两种动作完成,消息的传递可能是阻塞非阻塞的,也可以叫做同步异步的,此处二者指相同概念。可以根据和发送或接收的组合分为以下四种方式。

简单的说,同步指过程中各个任务逐个执行,需要等待,异步则是过程中任务可以并发执行,无需等待。

1.2 阻塞与非阻塞

  • 阻塞式发送(blocking send):发送方进程会被一直阻塞, 直到消息被接受方进程收到。
  • 非阻塞式发送(nonblocking send): 发送方进程调用 send() 后, 立即就可以其他操作。
  • 阻塞式接收(blocking receive):接收方调用 receive() 后一直阻塞, 直到消息到达可用。
  • 非阻塞式接受(nonblocking receive): 接收方调用 receive() 函数后, 要么得到一个有效的结果,要么得到一个空值, 即不会被阻塞。

阻塞是指进程在发起一个系统调用(System Call)后,等待调用操作完成时,内核将进程挂起为等待(waiting)状态,以确保它此时不会被调度执行。

1.3 阻塞式IO与非阻塞式IO

  • I/O系统调用(I/O System Call):阻塞与系统调用紧密相关,现代计算机中物理通信操作通常是异步的,而操作系统则默认提供阻塞式的I/O系统调用接口(blocking systemcall),这样使应用代码编写更简单(代码执行顺序与编写顺序一致)。当然也会提供非阻塞式的I/O系统调用接口(nonblocking systemcall),不会挂起程序,而是立刻返回一个值。
  • 非阻塞系统调用(non-blocking I/O system call 与 asynchronous I/O system call) 的存在可以用来实现线程级别的 I/O 并发, 与通过多进程实现的 I/O 并发相比可以减少内存消耗以及进程切换的开销。

Java中常见的IO都是阻塞式IO,如通过Socket来读取数据,调用 read() 方法后,若数据未准备就绪,当前线程就会一直阻塞,直到有数据返回。而如果是非阻塞式IO,则会立即返回,告知当前线程数据未准备就绪。

1.4 同步IO与异步IO

事实上,同步IO和异步IO模型是针对用户线程和内核的交互来说的:

  • 同步IO:当用户发出IO请求操作之后,如果数据没有就绪,需要通过用户线程或者内核不断地去轮询数据是否就绪,当数据就绪时,再将数据从内核拷贝到用户线程;

  • 异步IO:只有IO请求操作的发出是由用户线程来进行的,IO操作的两个阶段都是由内核自动完成,然后发送通知告知用户线程IO操作已经完成。也就是说在异步IO中,不会对用户线程产生任何阻塞。

这是同步IO和异步IO关键区别所在,同步IO和异步IO的关键区别反映在数据拷贝阶段是由用户线程完成还是内核完成。所以说异步IO必须要有操作系统的底层支持。

异步I/O系统调用非阻塞式的I/O系统调用类似,不会等待I/O操作完成,应用程序可以继续执行其他操作,待I/O完成时操作系统通知调用进程。二者的区别是非阻塞会立即返回数据(无论数据是否完整),而异步则要求结果是完整的,允许延迟获取。

1.5 5种IO模型

常见的5种IO模型包括:阻塞I/O、非阻塞IO、多路复用IO、信号驱动IO、异步IO。

一个完整的I/O操作包括两个阶段:

  1. 查看数据是否就绪;
  2. 进行数据拷贝(内核将数据拷贝到用户线程)。

需要说明的是等待就绪的阻塞是不使用CPU的,是在“空等”;而真正的读写操作的阻塞是使用CPU的,真正在”干活”,而且这个过程非常快,属于memory copy,带宽通常在1GB/s级别以上,可以理解为基本不耗时。

1.5.1 完全阻塞模型(BIO,Blocking I/O)

读写过程都是阻塞式的,直到收到结果前一直处于阻塞状态。

1
data = socket.read(); 

1.5.2 非阻塞I/O模型

当请求的I/O操作无法完成时,返回一个错误信息,进程仍然工作进行轮询直到操作完成。也就是说任务仍在继续进行,CPU也还在计算。

1
2
3
4
5
6
7
while(true){ 
data = socket.read(); // 不断轮询
if(data != error){
// 处理数据
break;
}
}

非阻塞式IO这种持续循环的方式很容易导致CPU占用率过高,所以很少被采用。

1.5.3 I/O多路复用模型

请求I/O操作无法完成,线程阻塞,但进程会同时检测多个资源,直到有资源可用。多路复用IO被使用的场合很多,Redis实现了多路复用IO,还有Java NIO本身就是多路复用IO。

在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有socket读写事件进行时才会使用IO资源,所以它大大减少了资源占用。

在Java NIO中,是通过 selector.select() 去查询每个通道是否有到达事件,如果没有事件,则一直阻塞在那里,因此这种方式会导致用户线程的阻塞。

也许有朋友会说,我可以采用 多线程 + 阻塞IO 达到类似的效果,但是由于在 多线程 + 阻塞IO 中,每个socket对应一个线程,这样会造成很大的资源占用,并且尤其是对于长连接来说,线程的资源一直不会释放,如果后面陆续有很多连接的话,就会造成性能上的瓶颈。而多路复用IO模式,通过一个线程就可以管理多个socket,只有当socket真正有读写事件发生才会占用资源来进行实际的读写操作。因此,多路复用IO比较适合连接数比较多的情况

另外多路复用IO为何比非阻塞IO模型的效率高是因为在非阻塞IO中,不断地询问socket状态是通过用户线程去进行的,而在多路复用IO中,轮询每个socket状态是内核在进行的,这个效率要比用户线程要高的多。

不过要注意的是,多路复用IO模型是通过轮询的方式来检测是否有事件到达,并且对到达的事件逐一进行响应。因此对于多路复用IO模型来说,一旦事件响应体很大,那么就会导致后续的事件迟迟得不到处理,并且会影响新的事件轮询。

1.5.4 信号驱动模型SIGIO

SIGIO多路复用I/O 的区别就在于 多路复用I/O 会阻塞应用程序SIGIO 则不会。请求I/O操作失败,返回一个信号处理器,线程继续运行,待资源释放时,线程在信号处理器处理收到信号,并通知系统可以读取数据。

在信号驱动IO模型中,当用户线程发起一个IO请求操作,会给对应的socket注册一个信号函数,然后用户线程会继续执行,当内核数据就绪时会发送一个信号给用户线程,用户线程接收到信号之后,便在信号函数中调用IO读写操作来进行实际的IO请求操作。

1.5.5 异步I/0模型ASYNC IO

请求I/O操作失败,进程不会阻塞,直接返回,等到数据操作结束,告知进程I/O操作已完成。

异步IO模型是最理想的IO模型,在异步IO模型中,当用户线程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从内核的角度,当它收到一个asynchronous read之后,它会立刻返回,说明read请求已经成功发起了,因此不会对用户线程产生任何block。然后,内核会等待数据准备完成,然后将数据拷贝到用户线程,当这一切都完成之后,内核会给用户线程发送一个信号,告诉它read操作完成了。也就说用户线程完全不需要知道实际的整个IO操作是如何进行的,只需要先发起一个请求,当接收内核返回的成功信号时表示IO操作已经完成,可以直接去使用数据了。

也就说在异步IO模型中IO操作的两个阶段都不会阻塞用户线程,这两个阶段都是由内核自动完成,然后发送一个信号告知用户线程操作已完成。用户线程中不需要再次调用IO函数进行具体的读写。这点是和信号驱动模型有所不同的,在信号驱动模型中,当用户线程接收到信号表示数据已经就绪,然后需要用户线程调用IO函数进行实际的读写操作而在异步IO模型中,收到信号表示IO操作已经完成,不需要再在用户线程中调用iO函数进行实际的读写操作

注意,异步IO是需要操作系统的底层支持在Java 7中,提供了Asynchronous IO

前面四种IO模型实际上都属于同步IO,只有最后一种是真正的异步IO,因为无论是多路复用IO还是信号驱动模型,IO操作的第2个阶段都会引起用户线程阻塞,也就是内核进行数据拷贝的过程都会让用户线程阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
	ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(8088);
// 主线程死循环等待新连接到来
while(!Thread.currentThread.isInturrupted()){
Socket socket = serverSocket.accept();
// 为新的连接创建新的线程
executor.submit(new ConnectIOnHandler(socket));
}

class ConnectIOnHandler extends Thread{
private Socket socket;
public ConnectIOnHandler(Socket socket){
this.socket = socket;
}
public void run(){
// 死循环处理读写事件
while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()) {
// 读取数据
String someThing = socket.read()....
if(someThing!=null) {
// 处理数据
......
// 写数据
socket.write()....
}
}
}
}

1.6 NIO

socket.accept()socket.read()socket.write() 三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。其实这也是所有使用多线程的本质: 1. 利用多核。 2. 当I/O阻塞系统,但CPU空闲的时候,可以利用多线程使用CPU资源。

现在的多线程一般都使用线程池,可以让线程的创建和回收成本相对较低。在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的I/O并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。

不过,这个模型最本质的问题在于,严重依赖于线程。但线程是很”贵”的资源,主要表现在:

  1. 线程的创建和销毁成本很高,在Linux这样的操作系统中,线程本质上就是一个进程。创建和销毁都是重量级的系统函数。
  2. 线程本身占用较大内存,像Java的线程栈,一般至少分配512K~1M的空间,如果系统中的线程数过千,恐怕整个JVM的内存都会被吃掉一半。
  3. 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。
  4. 容易造成锯齿状的系统负载。因为系统负载是用活动线程数或CPU核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。

所以,当面对十万甚至百万级连接的时候,传统的BIO模型是无能为力的。随着移动端应用的兴起和各种网络游戏的盛行,百万级长连接日趋普遍,此时,必然需要一种更高效的I/O处理模型。

socket.read() 为例子:

  • 传统的BIO里面 socket.read() ,如果TCP RecvBuffer里没有数据,函数会一直阻塞,直到收到数据,返回读到的数据。
  • 对于NIO,如果TCP RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回0,永远不会阻塞。
  • 最新的AIO(Async I/O)里面会更进一步:不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。

换句话说,BIO里用户最关心“我要读”,NIO里用户最关心”我可以读了”,在AIO模型里用户更需要关注的是“读完了”。NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。


二. NIO

2.1 简述

从JDK1.4开始,Java提供了一系列改进的输入/输出处理的新特性,被统称为NIO(Non-blocking I/O,在Java领域,也称为New I/O)。新增了许多用于处理输入输出的类,这些类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写,新增了满足NIO的功能。

NIO采用内存映射文件的方式来处理输入输出,NIO将文件或文件的一段区域映射到内存中,这样就可以像访问内存一样访问文件了。

NIO 与原来的 I/O 有同样的作用和目的,但实现方式不同。NIO基于块I/O,其效率可以比流 I/O 高许多。

2.2 内存映射

操作系统可以利用虚拟内存来将一个文件或文件其中一部分映射到内存中,可以像访问内存数组一样去处理文件,这比传统的文件操作要快很多。

java.nio 包提供了内存映射的办法,首先从文件中获取一个通道 Channel通道就是对磁盘文件的一个抽象,可以通过通道来访问内存映射、文件加锁机制以及文件间快速数据传递等操作系统特性。Channel 封装了对数据源的操作,数据源可以是 文件Socket 等,Channel 主要用来做非阻塞式读写

2.3 流与块的比较

原来的 I/O 库(在 java.io.* 中) 与 NIO 最重要的区别是数据打包和传输的方式。正如前面提到的,原来的 I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。面向流 的 I/O 系统一次一个字节地处理数据。一个输入流产生一个字节的数据,一个输出流消费一个字节的数据。为流式数据创建过滤器非常容易。链接几个过滤器,以便每个过滤器只负责单个复杂处理机制的一部分,这样也是相对简单的。不利的一面是,面向流的 I/O 通常相当慢

一个 面向块 的 I/O 系统以块的形式处理数据。每一个操作都在一步中产生或者消费一个数据块。按块处理数据比按(流式的)字节处理数据要快得多。但是面向块的 I/O 缺少一些面向流的 I/O 所具有的优雅性和简单性。在NIO中有几个核心对象需要掌握:缓冲区(Buffer)、通道(Channel)、选择器(Selector)。

2.4 IO 与 NIO 的区别

IO NIO
面向流 面向缓冲
阻塞IO 非阻塞IO
选择器

2.4.1 面向流和面向缓冲

Java IO和NIO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

2.4.2 阻塞IO与非阻塞IO

  • Java IO的各种流是阻塞的。这意味着,当一个线程调用 read()write() 时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
  • Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

2.4.3 选择器

Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

2.5 缓冲区-Buffer

2.5.1 概述

缓冲区实际上是一个容器对象,简单的看像是一个数组,在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,它也是写入到缓冲区中的;任何时候访问 NIO 中的数据,都是将它放到缓冲区中。而在面向流I/O系统中,所有数据都是直接写入或者直接将数据读取到Stream对象中。具体看下面这张图就理解了:

上面的图描述了从一个客户端向服务端发送数据,然后服务端接收数据的过程。客户端发送数据时,必须先将数据存入Buffer中,然后将Buffer中的内容写入通道。服务端这边接收数据必须通过Channel将数据读入到Buffer中,然后再从Buffer中取出数据来处理。

2.5.2 类结构和API

在NIO中,所有的缓冲区类型都继承于抽象类Buffer,支持顺序和随机数据访问,其有一系列子类ByteBuffer,CharBuffer等,最常用的就是ByteBuffer,StringBuffer并不属于这一系列。

对于Java中的基本类型,基本都有一个具体Buffer类型与之相对应,它们之间的继承关系如下图所示:

抽象类Buffer的四个关键属性:

属 性 作 用
capacity 容量,指缓冲区能够容纳的数据元素的最大数量,这一容量在缓冲区创建时被设定,并且永远不能被改变
limit 上界,指缓冲区的第一个不能被读或写的元素,或者说是,缓冲区中现存元素的计数
position 位置,指下一个要被读或写的元素的索引,位置会自动由相应的get()和put()函数更新
mark 标记,指一个备忘位置,调用mark()来设定mark=position,调用reset()来设定postion=mark,标记未设定前是未定义的

Buffer中提供了以下的一些方法:

方 法 作 用
Object array() 返回此缓冲区的底层实现数组
int arrayOffset() 返回此缓冲区的底层实现数组中第一个缓冲区还俗的偏移量
int capacity() 返回此缓冲区的容量
Buffer clear() 清除此缓冲区
Buffer flip() 反转此缓冲区
boolean hasArray() 告知此缓冲区是否具有可访问的底层实现数组
boolean hasRemaining() 告知在当前位置和限制之间是否有元素
boolean isDirect() 告知此缓冲区是否为直接缓冲区
boolean isReadOnly() 告知此缓冲区是否为只读缓存
int limit() 返回此缓冲区的上界
Buffer limit(int newLimit) 设置此缓冲区的上界
Buffer mark() 在此缓冲区的位置设置标记
int position() 返回此缓冲区的位置
Buffer position(int newPosition) 设置此缓冲区的位置
int remaining() 返回当前位置与上界之间的元素数
Buffer reset() 将此缓冲区的位置重置为以前标记的位置
Buffer rewind() 重绕此缓冲区

2.5.3 使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestIntBuffer {
public static void main(String[] args) {
// 分配新的int缓冲区,参数为缓冲区容量
// 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。它将具有一个底层实现数组,其数组偏移量将为零。
IntBuffer buffer = IntBuffer.allocate(8);

for (int i = 0; i < buffer.capacity(); ++i) {
int j = 2 * (i + 1);
// 将给定整数写入此缓冲区的当前位置,当前位置递增
buffer.put(j);
}

// 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为0
buffer.flip();

// 查看在当前位置和限制位置之间是否有元素
while (buffer.hasRemaining()) {
// 读取此缓冲区当前位置的整数,然后当前位置递增
int j = buffer.get();
System.out.print(j + " ");
}
}
}

运行后可以看到:

1
2  4  6  8  10  12  14  16 

2.5.4 缓冲区分片

通过 slice() 方法可以创建一个子缓冲区,也就是切出一块缓冲区,和原有的缓冲区是共享空间的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
// 缓冲区分片
ByteBuffer byteBuffer = ByteBuffer.allocate(10);

// 缓冲区中的数据0-9
for (int i=0; i<byteBuffer.capacity(); i++) {
byteBuffer.put((byte) i);
}

// 创建子缓冲区
byteBuffer.position(3);
byteBuffer.limit(7);
ByteBuffer slice = byteBuffer.slice();

// 改变子缓冲区的内容
for (int i=0; i<slice.capacity(); i++) {
byte b = slice.get(i);
b *= 10;
slice.put(i, b);
}

byteBuffer.position(0);
byteBuffer.limit(byteBuffer.capacity());

while (byteBuffer.remaining()>0) {
System.out.print(byteBuffer.get());
System.out.print(" ");
}
}

运行后可以看到如下结果,只有子缓冲区“可见的”那部分数据发生了变化,并且说明子缓冲区与原缓冲区是数据共享的。

1
0 1 2 30 40 50 60 7 8 9 

2.5.5 只读缓冲区

可以通过调用缓冲区的 asReadOnlyBuffer() 方法,将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过它是只读的。如果原缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化。

如果尝试修改只读缓冲区的内容,则会报 ReadOnlyBufferException 异常。只读缓冲区对于保护数据很有用。在将缓冲区传递给某个对象的方法时,无法知道这个方法是否会修改缓冲区中的数据。创建一个只读的缓冲区可以保证该缓冲区不会被修改。只可以把常规缓冲区转换为只读缓冲区,而不能将只读的缓冲区转换为可写的缓冲区。

2.5.6 状态变化

缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,如果我们使用 get() 方法从缓冲区获取数据或者使用 put() 方法把数据写入缓冲区,都会引起缓冲区状态的变化。

缓冲区结构就是一个数组,其四个属性 mark <= position <= limit <= capacity ,也就是 0<=标记<=位置<=界限<=容量

  1. 容量(Capacity):容纳元素最大数量,不可修改。指定了可以存储在缓冲区中的最大数据容量,实际上,它指定了底层数组的大小,或者至少是指定了准许我们使用的底层数组的容量。
  2. 上届(Limit):当前数据总数。指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。
  3. 位置(Position):下一个元素位置,下次读写所在位置。它的值由 get() / put() 方法自动更新,在新创建一个Buffer对象时,position被初始化为0。
  4. 标记(Mark):上一次读写的位置,可用于重复一个读写操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class Buffer {
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

Buffer(int mark, int pos, int lim, int cap) { // package-private
if (cap < 0)
throw createCapacityException(cap);
this.capacity = cap;
limit(lim);
position(pos);
if (mark >= 0) {
if (mark > pos)
throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")");
this.mark = mark;
}
}
......
}

缓冲区存取主要通过 put()get() 方法,可以通过 position() 读取指定位置的元素,当新建缓冲区时,limit为总量。不断通过 put() 把数据放入缓冲区,直到数据耗尽或到达容量,然后可以开始读取操作,若要从头开始读取应该调整limit,filp() 方法封装了这个操作。调用 filp() 将界限设为当前位置,即最后一个元素位置。不断调用get()将所有缓冲区数据读出后,调用 clear() 清空数据,来为下一次使用作准备,clear() 会将pos回归0,limit回归容量大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
public Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

public Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

因为涉及到修改limit,而缓冲区又根据limit限制了读写操作,所以使用时要注意limit的变化不要影响到自己的读写操作。

调用mark记录当前位置,position会在读写操作后改变,可以通过 reset() 方法恢复position。

重读可以调用 rewind()mark / reset() 等方法,获取缓冲区可以通过 ByteBuffer.allocate()ByteBuffer.wrap() 等方法。

下面通过一个案例来了解状态变化的过程:创建一个256字节的缓冲区,初始 mark = -1,pos = 0,limit = 256,capacity = 256

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Test {
public static void main(String[] args){
// 初始化时
// mark = -1,pos = 0,limit = 256,capacity = 256
ByteBuffer byteBuffer = ByteBuffer.allocate(256);

// 从通道读取4个数据,也就是写入缓冲区4个数据
// mark = -1,pos = 4,limit = 256,capacity = 256
byteBuffer.put("abcd".getBytes());
System.out.println(byteBuffer.toString());//[pos=4 lim=256 cap=256]

// 接着把数据写入输出通道,也就是从缓冲区读取数据
// 首先要调用flip(),使输出时读取数据从0开始,并读取到limit为缓冲区所含数据
// mark = -1,pos = 0,limit = 4,capacity = 256
byteBuffer.flip();

// 接着调用get()开始从缓冲区读取数据
System.out.println(byteBuffer.toString());//[pos=0 lim=4 cap=256]
System.out.println(byteBuffer.get());//97
System.out.println(byteBuffer.toString());//[pos=1 lim=4 cap=256]
System.out.println(byteBuffer.get());//98
System.out.println(byteBuffer.toString());//[pos=2 lim=4 cap=256]
System.out.println(byteBuffer.get());//99
System.out.println(byteBuffer.toString());//[pos=3 lim=4 cap=256]
System.out.println(byteBuffer.get());//100
System.out.println(byteBuffer.toString());//[pos=4 lim=4 cap=256]
// mark = -1,pos = 4,limit = 4,capacity = 256
//继续调用get()会抛出java.nio.BufferUnderflowException
}
}

2.5.7 字节缓冲区**

字节缓冲区和其他缓冲区类型最明显的不同在于,它们可能成为通道所执行I/O的源头或目标,如果对NIO有了解的朋友们一定知道,通道只接收 ByteBuffer 作为参数。

如我们所知道的,操作系统在内存区域进行I/O操作,这些内存区域,就操作系统方面而言,是相连的字节序列。于是,毫无疑问,只有字节缓冲区有资格参与I/O操作。也请回想一下操作系统会直接存取进程—-在本例中是JVM进程的内存空间,以传输数据。这也意味着I/O操作的目标内存区域必须是连续的字节序列,在JVM中,字节数组可能不会在内存中连续存储,或者无用存储单元收集可能随时对其进行移动。在Java中,数组是对象,而数据存储在对象中的方式在不同的JVM实现中各有不同。

出于这一原因,引入了直接缓冲区的概念。直接缓冲区被用于与通道和固有I/O线程交互,它们通过使用固有代码来告知操作系统直接释放或填充内存区域,对用于通道直接或原始存取的内存区域中的字节元素的存储尽了最大的努力。

直接字节缓冲区通常是I/O操作最好的选择。在设计方面,它们支持JVM可用的最高效I/O机制,非直接字节缓冲区可以被传递给通道,但是这样可能导致性能损耗,通常非直接缓冲不可能成为一个本地I/O操作的目标,如果开发者向一个通道中传递一个非直接ByteBuffer对象用于写入,通道可能会在每次调用中隐含地进行下面的操作:

  1. 创建一个临时的直接ByteBuffer对象
  2. 将非直接缓冲区的内容复制到临时缓冲中
  3. 使用临时缓冲区执行低层次I/O操作
  4. 临时缓冲区对象离开作用于,并最终成为被回收的无用数据

这可能导致缓冲区在每个I/O上复制并产生大量对象,而这种事都是我们极力避免的。

直接缓冲区是I/O的最佳选择,但可能比创建非直接缓冲区要花费更高的成本。直接缓冲区使用的内存是通过调用本地操作系统的代码分配的,绕过了标准JVM堆栈。建立和销毁直接缓冲区会明显比具有堆栈的缓冲区更极爱破费,这取决于主操作系统以及JVM实现。直接缓冲区的内存区域不受无用存储单元收集支配,因为它们位于标准JVM堆栈之外。

直接ByteBuffer是通过调用具有所需容量的 ByteBuffer.allocateDirect() 函数产生的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class ByteBuffer
extends Buffer
implements Comparable<ByteBuffer> {

public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw createCapacityException(capacity);
return new HeapByteBuffer(capacity, capacity);//返回局部内部类HeapByteBuffer
}
...
public static ByteBuffer allocateDirect(int capacity)
{
return new DirectByteBuffer(capacity);
}
...
}

可以使用 通道 + 缓冲区 的方式来代替随机访问文件,会提高读写性能。

2.6 通道-Channel**

2.6.1 概述**

通道用于在字节缓冲区和另一侧的实体(如文件、Socket等)之间传输数据,通常会与操作系统的文件描述符(FileDescriptor)和文件句柄(FileHandler)有着一对一的关系。虽然通道比文件描述符更广义,但开发者经常使用到的多数通道都是连接到开放的文件描述符的。Channel类提供维持平台独立性所需的抽象过程,不然仍然会模拟现代操作系统本身的I/O性能。

通道是一种途径,借助该途径,可以用最小的总开销来访问操作系统本身的I/O服务。缓冲区则是通道内部用来发送和接收数据的端点

Channel和Stream的区别?Channel和传统IO中的Stream很相似。虽然很相似,但是有很大的区别,主要区别为:通道是双向的,通过一个Channel既可以进行读,也可以进行写;而Stream只能进行单向操作,通过一个Stream只能进行读或者写,比如InputStream只能进行读取操作,OutputStream只能进行写操作;

通道是一个对象,通过它可以读取和写入数据,当然了所有数据都通过Buffer对象来处理。我们永远不会将字节直接写入通道中,相反是将数据写入包含一个或者多个字节的缓冲区。同样不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

Channel提供了 Scatter / Gather 功能,也叫本地矢量I/O,就是在多个缓冲区上实现一个简单的I/O操作。比如一个write操作,数据从不同缓冲区按顺序抽取即 gather ,通过Channel发送出去。相反read操作就是按顺序散布 scatter 。缓冲区自身不需要具备 Scatter / Gather 功能,目前大部分操作系统都支持这种本地矢量I/O。

1
2
ByteBuffer[] bufferArray = {buffer1, buffer2};
socketChannel.write(bufferArray);

2.6.2 类结构**

Channel本身是一个接口,定义了方法 isOpen()close() 。所以对所有通道来说只有两种共同的操作:检查一个通道是否打开 isOpen() 和关闭一个打开的通道 close() ,其余所有的东西都是那些实现Channel接口以及它的子接口的类。

1
2
3
4
5
6
public interface Channel extends Closeable {

public boolean isOpen();

public void close() throws IOException;
}

Channel分为 FileChannelSocketChannel 两类。

SocketChannel 又分为如下三类。

  1. ServerSocketChannel-监听端口
  2. SocketChannel-TCP通信
  3. DatagramChannel-UDP通信

在NIO中,提供了多种通道对象,而所有的通道对象都实现了Channel接口。它们之间的继承关系如下图所示:

Channel(通道)表示到实体如硬件设备、文件、网络套接字或可以执行一个或多个不同I/O操作的程序组件的开放的连接。所有的Channel都不是通过构造器创建的,而是通过传统的节点InputStream、OutputStream的getChannel方法来返回响应的Channel。

Channel中最常用的三个类方法就是map、read和write,其中map方法用于将Channel对应的部分或全部数据映射成ByteBuffer,而read或write方法有一系列的重载形式,这些方法用于从Buffer中读取数据或向Buffer中写入数据。

2.6.3 文件通道**

文件通道总是阻塞式的,因此不能被置于非阻塞模式下。

通道可以以多种方式创建。Socket通道可以有直接创建Socket通道的工厂方法,但是一个FileChannel对象却只能通过在一个打开的RandomAccessFile、FileInputStream或FileOutputStream对象上调用 getChannel() 方法来获取,开发者不能直接创建一个FileChannel。

调用 getChannel() 方法会返回一个连接到相同文件的FileChannel对象且该FileChannel对象具有与file对象相同的访问权限,然后就可以使用通道对象来利用强大的FileChannel API了。

FileChannel对象是线程安全的,多个进程可以在同一个实例上并发调用方法而不会引起任何问题,不过并非所有的操作都是多线程的。影响通道位置或者影响文件的操作都是单线程的,如果有一个线程已经在执行会影响通道位置或文件大小的操作,那么其他尝试进行此类操作之一的线程必须等待,并发行为也会受到底层操作系统或文件系统的影响。

从文件中读出数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws Exception {
File file = new File("D:/files/readchannel.txt");
FileInputStream fis = new FileInputStream(file);
FileChannel fc = fis.getChannel();
ByteBuffer bb = ByteBuffer.allocate(35);
fc.read(bb);
bb.flip();
while (bb.hasRemaining())
{
System.out.print((char)bb.get());
}
bb.clear();
fc.close();
}

使用文件通道写数据。

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws Exception {
File file = new File("D:/files/writechannel.txt");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
FileChannel fc = raf.getChannel();
ByteBuffer bb = ByteBuffer.allocate(10);
String str = "abcdefghij";
bb.put(str.getBytes());
bb.flip();
fc.write(bb);
bb.clear();
fc.close();
}

注意:通道只能使用ByteBuffer

通过调用FileChannel类的 map() 方法可以获取一个ByteBuffer缓冲区,可以指定想要映射的文件区域与映射模式。

映射模式分为三种:

  • FileChannel.MapMode.READ_ONLY 缓冲区只读,尝试写入抛出 ReadOnlyBufferException 异常。
  • FileChannel.MapMode.READ_WRITE 缓冲区可写,写操作会在某个时刻回写到文件中,但其它同步进行程序不一定可以立刻看到修改。
  • FileChannel.MapMode.PRIVATE 缓冲区可写,但只在缓冲区修改,文件并不会被影响。

2.6.4 NIO案例**

(1) 使用NIO读取数据**

在前面我们说过,任何时候读取数据,都不是直接从通道读取,而是从通道读取到缓冲区。所以使用NIO读取数据可以分为下面三个步骤:

  1. 从FileInputStream获取Channel。
  2. 创建Buffer。
  3. 将数据从Channel读取到Buffer中。

下面是一个简单的使用NIO从文件中读取数据的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Program {  
public static void main( String args[] ) throws Exception {
FileInputStream fin = new FileInputStream("c:\\test.txt");

// 获取通道
FileChannel fc = fin.getChannel();

// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取数据到缓冲区
fc.read(buffer);

buffer.flip();

while (buffer.remaining()>0) {
byte b = buffer.get();
System.out.print(((char)b));
}

fin.close();
}
}
(2) 使用NIO写入数据**

使用NIO写入数据与读取数据的过程类似,同样数据不是直接写入通道,而是写入缓冲区,可以分为下面三个步骤:

  1. 从FileInputStream获取Channel
  2. 创建Buffer
  3. 将数据从Channel写入到Buffer中

下面是一个简单的使用NIO向文件中写入数据的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Program {

private static final byte message[] = { 83, 111, 109, 101, 32,
98, 121, 116, 101, 115, 46 };

public static void main( String args[] ) throws Exception {
FileOutputStream fout = new FileOutputStream( "c:\\test.txt" );

FileChannel fc = fout.getChannel();

ByteBuffer buffer = ByteBuffer.allocate( 1024 );

for (int i=0; i<message.length; ++i) {
buffer.put( message[i] );
}

buffer.flip();

fc.write( buffer );

fout.close();
}
}

2.7 选择器-Selector**

2.7.1 概述**

Selector即选择器,也叫多路复用器。可以用来检查一个或多个通道的状态是否可读可写,可以通过其实现单线程管理多个通道,管理多个网络连接。所以NIO实际上是一个多路复用IO

这样的设计使得只有在连接真正有读写事件发生时,才会调用函数来进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并且避免了多线程之间的上下文切换导致的开销。

Channel通过 write()read() 方法读取和写入字节流,相较于通过Stream读取字节流其特点是封装了异步操作,且具有双向性,能同时支持读写

与Selector有关的一个关键类是SelectionKey,一个SelectionKey表示一个到达的事件,这2个类构成了服务端处理业务的关键逻辑。

某种程度上来说,理解选择器比理解缓冲区和通道类更困难一些和复杂一些,因为涉及了三个主要的类,它们都会同时参与到这整个过程中。

  • 选择器(Selector):选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态
  • 可选择通道(SelectableChannel):这个抽象类提供了实现通道的可选择性所需要的公共方法,它是所有支持就绪检查的通道类的父类,FileChannel对象不是可选择的,因为它们没有继承SelectableChannel,所有Socket通道都是可选择的,包括从管道(Pipe)对象中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以设定对哪个选择器而言哪种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。
  • 选择键(SelectionKey):选择键封装了特定的通道与特定的选择器的注册关系。调用 SelectableChannel.register() 方法会返回选择键并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。

这里先将选择器的执行分解为几条细节:

  1. 创建一个或者多个可选择的通道(SelectableChannel)。
  2. 将这些创建的通道注册到选择器对象中。
  3. (SelectionKey)选择键会记住开发者关心的通道,它们也会追踪对应的通道是否已经就绪。
  4. 开发者调用一个选择器对象的 select() 方法,当方法从阻塞状态返回时,选择键会被更新。
  5. 获取选择键的集合,找到当时已经就绪的通道,通过遍历这些键,开发者可以选择对已就绪的通道要做的操作。

2.7.2 使用示例**

创建选择器,并将其注册到通道。

1
2
3
4
5
6
7
8
9
10
11
12
//创建SocketChannel和Selector对象
SocketChannel socketChannel = SocketChannel.open();
Selector selector = Selector.open();

//配置socketChannel为非阻塞模式
socketChannel.configureBlocking(false);

//注册socketChannel到selector,channel必须继承AbstractSelectableChannel
//SelectionKey表示通道对象和选择器对象之间的注册关系
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
//select()方法在将线程置于睡眠状态直到这些感兴趣的事件中的一个发生或者10秒钟过去,这就是所谓的事件驱动。
ready = selector.select(10000);

2.7.3 如何结合事件模型使用NIO同步非阻塞特性**

回忆BIO模型,之所以需要多线程,是因为在进行I/O操作的时候,一是没有办法知道到底能不能写、能不能读,只能”傻等”,即使通过各种估算,算出来操作系统没有能力进行读写,也没法在socket.read()和socket.write()函数中返回,这两个函数无法进行有效的中断。所以除了多开线程另起炉灶,没有好的办法利用CPU。

NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,记录的方式通常是在Selector上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。

下面具体看下如何利用事件模型单线程处理所有I/O请求:

NIO的主要事件有几个:读就绪、写就绪、有新连接到来。

我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。

其次,用一个死循环选择就绪的事件,会执行系统调用(Linux 2.6之前是 selectpoll ,2.6之后是 epoll ,Windows是 IOCP),还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。

注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以你可以放心大胆地在一个 while(true) 里面调用这个函数而不用担心CPU空转。

所以我们的程序大概的模样是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
interface ChannelHandler{
void channelReadable(Channel channel);
void channelWritable(Channel channel);
}

class Channel{
Socket socket;
Event event;//读,写或者连接
}

//IO线程主循环:
class IoThread extends Thread{
public void run(){
Channel channel;
while(channel=Selector.select()){//选择就绪的事件和对应的连接
if(channel.event==accept){
registerNewChannelHandler(channel);//如果是新连接,则注册一个新的读写处理器
}
if(channel.event==write){
getChannelHandler(channel).channelWritable(channel);//如果可以写,则执行写事件
}
if(channel.event==read){
getChannelHandler(channel).channelReadable(channel);//如果可以读,则执行读事件
}
}
}
Map<Channel,ChannelHandler> handlerMap;//所有channel的对应事件处理器
}

这个程序很简短,也是最简单的Reactor模式注册所有感兴趣的事件处理器,单线程轮询选择就绪事件,执行事件处理器

2.7.4 优化线程模型**

由上面的示例我们大概可以总结出NIO是怎么解决掉线程的瓶颈并处理海量连接的:

NIO由原来的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。

并且由于线程的节约,连接数大的时候因为线程切换带来的问题也随之解决,进而为处理海量连接提供了可能。

单线程处理I/O的效率确实非常高,没有线程切换,只是拼命的读、写、选择事件。但现在的服务器,一般都是多核处理器,如果能够利用多核心进行I/O,无疑对效率会有更大的提高。

仔细分析一下我们需要的线程,其实主要包括以下几种:

  1. 事件分发器,单线程选择就绪的事件。
  2. I/O处理器,包括connect、read、write等,这种纯CPU操作,一般开启CPU核心个线程就可以。
  3. 业务线程,在处理完I/O后,业务一般还会有自己的业务逻辑,有的还会有其他的阻塞I/O,如DB操作,RPC等。只要有阻塞,就需要单独的线程。

Java的Selector对于Linux系统来说,有一个致命限制:同一个channel的select不能被并发的调用。因此,如果有多个I/O线程,必须保证:一个socket只能属于一个IoThread,而一个IoThread可以管理多个socket。

另外连接的处理和读写的处理通常可以选择分开,这样对于海量连接的注册和读写就可以分发。虽然 read()write() 是比较高效无阻塞的函数,但毕竟会占用CPU,如果面对更高的并发则无能为力。

通过上面的分析,可以看出NIO在服务端对于解放线程,优化I/O和处理海量连接方面,确实有自己的用武之地。那么在客户端上,NIO又有什么使用场景呢?

常见的 客户端BIO + 连接池 模型,可以建立n个连接,然后当某一个连接被I/O占用的时候,可以使用其他连接来提高性能。

但多线程的模型面临和服务端相同的问题:如果指望增加连接数来提高性能,则连接数又受制于线程数、线程很贵、无法建立很多线程,则性能遇到瓶颈。

2.7.5 每连接顺序请求的Redis**

对于Redis来说,由于服务端是全局串行的,能够保证同一连接的所有请求与返回顺序一致。这样可以使用 单线程队列 ,把请求数据缓冲。然后pipeline发送,返回future,然后channel可读时,直接在队列中把future取回来,done() 就可以了。

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class RedisClient Implements ChannelHandler{
private BlockingQueue CmdQueue;
private EventLoop eventLoop;
private Channel channel;

class Cmd{
String cmd;
Future result;
}

public Future get(String key){
Cmd cmd= new Cmd(key);
queue.offer(cmd);
eventLoop.submit(new Runnable(){
List list = new ArrayList();
queue.drainTo(list);
if(channel.isWritable()){
channel.writeAndFlush(list);
}
});
}

public void ChannelReadFinish(Channel channel,Buffer Buffer){
List result = handleBuffer();//处理数据
//从cmdQueue取出future,并设值,future.done();
}

public void ChannelWritable(Channel channel){
channel.flush();
}
}

这样做,能够充分的利用pipeline来提高I/O能力,同时获取异步处理能力。

2.7.6 多连接短连接的HttpClient**

类似于竞对抓取的项目,往往需要建立无数的HTTP短连接,然后抓取,然后销毁,当需要单机抓取上千网站线程数又受制的时候,怎么保证性能呢?

何不尝试NIO,单线程进行连接、写、读操作?如果连接、读、写操作系统没有能力处理,简单的注册一个事件,等待下次循环就好了。

如何存储不同的请求/响应呢?由于http是无状态没有版本的协议,又没有办法使用队列,好像办法不多。比较笨的办法是对于不同的socket,直接存储socket的引用作为map的key。

2.7.7 常见的RPC框架,如Thrift,Dubbo**

这种框架内部一般维护了请求的协议和请求号,可以维护一个以请求号为key,结果的result为future的map,结合 NIO + 长连接 ,获取非常不错的性能。

2.7.8 Proactor与Reactor**

一般情况下,I/O 复用机制需要事件分发器(event dispatcher)。 事件分发器的作用,即将那些读写事件源分发给各读写事件的处理者,就像送快递的在楼下喊: 谁谁谁的快递到了, 快来拿吧!开发人员在开始的时候需要在分发器那里注册感兴趣的事件,并提供相应的处理者(event handler),或者是回调函数;事件分发器在适当的时候,会将请求的事件分发给这些handler或者回调函数。

涉及到事件分发器的两种模式称为:ReactorProactor。 Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。在Reactor模式中,事件分发器等待某个事件或者可应用或个操作的状态发生(比如文件描述符可读写,或者是socket可读写),事件分发器就把这个事件传给事先注册的事件处理函数或者回调函数,由后者来做实际的读写操作。

而在Proactor模式中,事件处理者(或者代由事件分发器发起)直接发起一个异步读写操作(相当于请求),而实际的工作是由操作系统来完成的。发起时,需要提供的参数包括用于存放读到数据的缓存区、读的数据大小或用于存放外发数据的缓存区,以及这个请求完后的回调函数等信息。事件分发器得知了这个请求,它默默等待这个请求的完成,然后转发完成事件给相应的事件处理者或者回调。举例来说,在Windows上事件处理者投递了一个异步IO操作(称为overlapped技术),事件分发器等IO Complete事件完成。这种异步模式的典型实现是基于操作系统底层异步API的,所以我们可称之为“系统级别”的或者“真正意义上”的异步,因为具体的读写是由操作系统代劳的。

举个例子,将有助于理解Reactor与Proactor二者的差异,以读操作为例(写操作类似)。

在Reactor中实现读

  • 注册读就绪事件和相应的事件处理器。
  • 事件分发器等待事件。
  • 事件到来,激活分发器,分发器调用事件对应的处理器。
  • 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。

在Proactor中实现读:

  • 处理器发起异步读操作(注意:操作系统必须支持异步IO)。在这种情况下,处理器无视IO就绪事件,它关注的是完成事件。
  • 事件分发器等待操作完成事件。
  • 在分发器等待过程中,操作系统利用并行的内核线程执行实际的读操作,并将结果数据存入用户自定义缓冲区,最后通知事件分发器读操作完成。
  • 事件分发器呼唤处理器。
  • 事件处理器处理用户自定义缓冲区中的数据,然后启动一个新的异步操作,并将控制权返回事件分发器。

可以看出,两个模式的相同点,都是对某个I/O事件的事件通知(即告诉某个模块,这个I/O操作可以进行或已经完成)。在结构上,两者也有相同点:事件分发器负责提交IO操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调handler;不同点在于,异步情况下(Proactor),当回调handler时,表示I/O操作已经完成;同步情况下(Reactor),回调handler时,表示I/O设备可以进行某个操作( can readcan write )。

下面,我们将尝试应对为Proactor和Reactor模式建立可移植框架的挑战。在改进方案中,我们将Reactor原来位于事件处理器内的Read/Write操作移至分发器(不妨将这个思路称为“模拟异步”),以此寻求将Reactor多路同步I/O转化为模拟异步I/O。以读操作为例子,改进过程如下: - 注册读就绪事件和相应的事件处理器。并为分发器提供数据缓冲区地址,需要读取数据量等信息。 - 分发器等待事件(如在 select() 上等待)。 - 事件到来,激活分发器。分发器执行一个非阻塞读操作(它有完成这个操作所需的全部信息),最后调用对应处理器。 - 事件处理器处理用户自定义缓冲区的数据,注册新的事件(当然同样要给出数据缓冲区地址,需要读取的数据量等信息),最后将控制权返还分发器。 如我们所见,通过对多路I/O模式功能结构的改造,可将Reactor转化为Proactor模式。改造前后,模型实际完成的工作量没有增加,只不过参与者间对工作职责稍加调换。没有工作量的改变,自然不会造成性能的削弱。对如下各步骤的比较,可以证明工作量的恒定:

标准/典型的Reactor:

  • 步骤1:等待事件到来(Reactor负责)。
  • 步骤2:将读就绪事件分发给用户定义的处理器(Reactor负责)。
  • 步骤3:读数据(用户处理器负责)。
  • 步骤4:处理数据(用户处理器负责)。

改进实现的模拟Proactor:

  • 步骤1:等待事件到来(Proactor负责)。
  • 步骤2:得到读就绪事件,执行读数据(现在由Proactor负责)。
  • 步骤3:将读完成事件分发给用户处理器(Proactor负责)。
  • 步骤4:处理数据(用户处理器负责)。

对于不提供异步I/O API的操作系统来说,这种办法可以隐藏Socket API的交互细节,从而对外暴露一个完整的异步接口。借此,我们就可以进一步构建完全可移植的,平台无关的,有通用对外接口的解决方案。

代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
interface ChannelHandler{
void channelReadComplate(Channel channel,byte[] data);
void channelWritable(Channel channel);
}

class Channel{
Socket socket;
Event event;//读,写或者连接
}

//IO线程主循环:
class IoThread extends Thread{
public void run(){
Channel channel;
while(channel=Selector.select()){//选择就绪的事件和对应的连接
if(channel.event==accept){
registerNewChannelHandler(channel);//如果是新连接,则注册一个新的读写处理器
Selector.interested(read);
}
if(channel.event==write){
getChannelHandler(channel).channelWritable(channel);//如果可以写,则执行写事件
}
if(channel.event==read){
byte[] data = channel.read();
if(channel.read()==0)//没有读到数据,表示本次数据读完了
{
getChannelHandler(channel).channelReadComplate(channel,data);//处理读完成事件
}
if(过载保护){
Selector.interested(read);
}
}
}
}
Map<Channel,ChannelHandler> handlerMap;//所有channel的对应事件处理器
}

2.7.9 Selector.wakeup()**

主要作用?

  • 解除阻塞在Selector.select()/select(long)上的线程,立即返回。
  • 两次成功的select之间多次调用wakeup等价于一次调用。
  • 如果当前没有阻塞在select上,则本次wakeup调用将作用于下一次select——“记忆”作用。

为什么要唤醒?

  • 注册了新的channel或者事件。
  • channel关闭,取消注册。
  • 优先级更高的事件触发(如定时器事件),希望及时处理。

原理?

Linux上利用pipe调用创建一个管道,Windows上则是一个loopback的tcp连接。这是因为win32的管道无法加入select的fd set,将管道或者TCP连接加入select fd set。

wakeup往管道或者连接写入一个字节,阻塞的select因为有I/O事件就绪,立即返回。可见,wakeup的调用开销不可忽视。

2.7.10 Buffer的选择**

通常情况下,操作系统的一次写操作分为两步:

  1. 将数据从用户空间拷贝到系统空间。

  2. 从系统空间往网卡写。同理,读操作也分为两步:

    ① 将数据从网卡拷贝到系统空间;

    ② 将数据从系统空间拷贝到用户空间。

对于NIO来说,缓存的使用可以使用DirectByteBuffer和HeapByteBuffer。如果使用了DirectByteBuffer,一般来说可以减少一次系统空间到用户空间的拷贝。但Buffer创建和销毁的成本更高,更不宜维护,通常会用内存池来提高性能。

如果数据量比较小的中小应用情况下,可以考虑使用heapBuffer;反之可以用directBuffer。

使用NIO != 高性能,当连接数<1000,并发程度不高或者局域网环境下NIO并没有显著的性能优势。

NIO并没有完全屏蔽平台差异,它仍然是基于各个操作系统的I/O系统实现的,差异仍然存在。使用NIO做网络编程构建事件驱动模型并不容易,陷阱重重。

推荐大家使用成熟的NIO框架,如Netty,MINA等。解决了很多NIO的陷阱,并屏蔽了操作系统的差异,有较好的性能和编程模型。

最后总结一下到底NIO给我们带来了些什么:

  • 事件驱动模型
  • 避免多线程
  • 单线程处理多任务
  • 非阻塞I/O,I/O读写不再阻塞,而是返回0
  • 基于block的传输,通常比基于流的传输更高效
  • 更高级的IO函数,zero-copy
  • IO多路复用大大提高了Java网络应用的可伸缩性和实用性

2.7.11 源码**

Selector是通过调用静态工厂方法 open() 来实例化的,这个从前面的代码里面也看到了,选择器不是像通道或流那样的基本I/O对象—-数据从来没有通过他们进行传递。

通道是调用 register() 方法注册到选择器上的,从上面代码里可以看到 register() 方法接受一个Selector对象作为参数,以及一个名为 ops 的整数型参数,第二个参数表示关心的通道操作。在JDK1.4中,有四种被定义的可选择操作:(read)、(write)、连接(connect)和接受(accept)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class Selector implements Closeable {
protected Selector() { }

public static Selector open() throws IOException {//创建一个Selector对象,打开选择器
return SelectorProvider.provider().openSelector();
}

public abstract boolean isOpen();//判断Selector对象是否打开

public abstract SelectorProvider provider();//返回创建此channel的provider

public abstract Set<SelectionKey> keys();//返回此选择器的所有键集合,键集合不能直接修改,只能通过取消SelectionKey注销SocketChannel的方法移除SelectionKey

public abstract Set<SelectionKey> selectedKeys();//返回此选择器的选定键集合,键可以直接移除,但不能直接加入键集合

public abstract int selectNow() throws IOException;//选择一组准备好进行I/O操作的键,此方法执行非阻塞的选择操作,若自上个选择操作之后没有通道可以选择了就返回0,调用此方法会清除之前调用wakeup()的效果。

public abstract int select(long timeout) throws IOException;//选择一组准备好进行I/O操作的键,此方法执行非阻塞的选择操作,只有选择了至少一个通道后,或当前线程被中断,或选择器调用了wakeup()后才会返回

public abstract int select() throws IOException;//选择一组准备好进行I/O操作的键,此方法执行阻塞的选择操作,只有选择了至少一个通道后,或当前线程被中断,或选择器调用了wakeup()后才会返回

public abstract Selector wakeup();//使选择器当前尚未返回的第一个选择操作立即返回。若另一个线程在当前被select()或select(long timeout)阻塞,调用此方法会立即返回选择器。若当前没有正在进行的选择操作,则下次调用这些方法时其中一个会立即返回,除非同时调用selectNow()。除非同时再次调用此方法,否则select()或select(long timeout)方法的后续调用仍会继续阻塞。在两个连续选择之间多次调用此方法,效果等同于只调用一次。

public abstract void close() throws IOException;//关闭选择器。
}

选择键源码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public abstract class SelectionKey {
protected SelectionKey() { }

// -- Channel and selector operations --

public abstract SelectableChannel channel();//返回该SelectionKey对应的channel。

public abstract Selector selector();//返回该SelectionKey对应的Selector。

public abstract boolean isValid();

public abstract void cancel();

// -- Operation-set accessors --

public abstract int interestOps();//返回代表需要Selector监控的IO操作的bit mask

public abstract SelectionKey interestOps(int ops);

public abstract int readyOps();//返回一个bit mask,代表在相应channel上可以进行的IO操作。

// -- Operation bits and bit-testing convenience methods --

public static final int OP_READ = 1 << 0;//监听事件-读就绪

public static final int OP_WRITE = 1 << 2;//监听事件-写就绪

public static final int OP_CONNECT = 1 << 3;//监听事件-连接就绪

public static final int OP_ACCEPT = 1 << 4;//监听事件-接收就绪

public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}

public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
}

public final boolean isConnectable() {
return (readyOps() & OP_CONNECT) != 0;
}

public final boolean isAcceptable() {
return (readyOps() & OP_ACCEPT) != 0;
}

// -- Attachments --
private volatile Object attachment;//

private static final AtomicReferenceFieldUpdater<SelectionKey,Object> attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater(SelectionKey.class, Object.class, "attachment");

public final Object attach(Object ob) {
return attachmentUpdater.getAndSet(this, ob);
}

public final Object attachment() {
return attachment;
}
}
  1. 就像前面提到的,一个键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系,channel() 方法和 selector() 方法反映了这种关系。

  2. 开发者可以使用 cancel() 方法终结这种关系,可以使用 isValid() 方法来检查这种有效的关系是否仍然存在,可以使用 readyOps() 方法来获取相关的通道已经就绪的操作。

  3. 第2点有提到 readyOps() 方法,不过我们往往不需要使用这个方法,SelectionKey类定义了四个便于使用的布尔方法来为开发者测试通道的就绪状态,例如:

    1
    if (key.isWritable()){...}

    这种写法就等价于:

    1
    if ((key.readyOps() & SelectionKeys.OPWRITE) != 0){...}

    isWritable()isReadable()isConnectable()isAcceptable() 四个方法在任意一个SelectionKey对象上都能安全地调用。

  4. 当通道关闭时,所有相关的键会自动取消(一个通道可以被注册到多个选择器上);当选择器关闭时,所有被注册到该选择器的通道都会被注销并且相关的键立即被取消。

接着就是Selector的核心选择过程了。基本上来说,选择器是对 select()poll()epoll() 等本地调用或者类似的操作系统特定的系统调用的一个包装。但是Selector所做的不仅仅是简单地向本地代码传送参数,每个操作都有特定的过程,对这个过程的理解是合理地管理键和它们所表示的状态信息的基础。

选择操作是当三种形式的 select() 中的任意一种被调用时,由选择器执行的。不管是哪一种形式的调用,下面步骤将被执行:

  1. 已取消的键的集合将会被检查。如果它是非空的,每个已取消的键的集合中的键将从另外两个集合中移除,并且相关的通道将被注销。此步骤结束,已取消的键的集合将是空的。
  2. 已注册的键的集合中的键的interest集合将被检查,此步骤结束,对interest集合的改动不会影响剩余的检查过程。一旦就绪条件被定下来,底层操作系统将会进行查询,以确定每个通道所关心的操作的真实就绪状态,依赖于特定的 select() 方法调用,如果没有通道已经准备好,线程可能会在这时阻塞,通常会有一个超时值。
  3. 步骤2可能会花费很长时间,特别是线程处于阻塞状态时。与该选择器相关的键可能会同时被取消,当步骤2结束时,步骤1将重新执行,以完成任意一个在选择进行的过程中,键已经被取消的通道的注册。
  4. select操作的返回值不是已准备好的通道的总数,而是从上一个 select() 调用之后进入就绪状态的通道的数量。之前的调用中就绪的,并且在本次调用中仍然就绪的通道不会被计入,而那些在前一次调用中已经就绪但已经不再处于就绪状态的通道也不会被计入。

2.7.12 案例**

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
try {
//创建SocketChannel和Selector对象,注册socketChannel到selector
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1",8000));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector,SelectionKey.OP_ACCEPT);

//创建读写缓冲区
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
ByteBuffer writeBuffer = ByteBuffer.allocate(128);
writeBuffer.put("received".getBytes());
writeBuffer.flip();

while (true){
int ready = selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();

while (it.hasNext()){
SelectionKey key = it.next();
it.remove();

if(key.isAcceptable()){
// 创建新的连接,并且把连接注册到selector上,而且,
// 声明这个channel只对读操作感兴趣。
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
readBuffer.clear();
socketChannel.read(readBuffer);

readBuffer.flip();
System.out.println("received : " + new String(readBuffer.array()));
key.interestOps(SelectionKey.OP_WRITE);
}else if(key.isWritable()){
writeBuffer.rewind();
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(writeBuffer);
key.interestOps(SelectionKey.OP_READ);
}
}
}

}catch (IOException ex){
ex.printStackTrace();
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));

ByteBuffer writeBuffer1 = ByteBuffer.allocate(128);
ByteBuffer writeBuffer2 = ByteBuffer.allocate(128);
ByteBuffer readBuffer = ByteBuffer.allocate(16);
writeBuffer1.put("hello ".getBytes());
writeBuffer2.put("world".getBytes());

writeBuffer1.flip();
writeBuffer2.flip();
ByteBuffer[] bufferArray = {writeBuffer1, writeBuffer2};
while (true){
writeBuffer1.rewind();
writeBuffer2.rewind();
socketChannel.write(bufferArray);
readBuffer.clear();
socketChannel.read(readBuffer);
//socketChannel.close();
}
} catch (IOException e) {
ex.printStackTrace();
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
received : hello world 
received : hello world
received : hello world
received : hello world
received : hello world
received : hello world
received : hello world
received : hello world
received : hello world
received : hello world
received : hello world
received : hello world
received : hello world

第三节 文件加锁机制**

实际使用中不难避免同时多个执行的程序对文件读写的情况,所以需要某种机制来避免这类情况造成文件损坏,文件锁可以控制对文件或文件范围内字节的访问。

1
2
3
4
5
6
7
8
try {
Path path = Paths.get("D:/test.txt");
FileChannel fc = FileChannel.open(path);
FileLock lock1 = fc.lock();//会阻塞直到获得锁
FileLock lock2 = fc.tryLock();//立即返回,要么返回锁,要么返回null
} catch (IOException ex){
ex.printStackTrace();
}

源码如下。

1
2
3
4
5
6
7
8
9
10
11
public final FileLock lock() throws IOException {
return lock(0L, Long.MAX_VALUE, false);
}

public abstract FileLock lock(long position, long size, boolean shared) throws IOException;

public final FileLock tryLock() throws IOException {
return tryLock(0L, Long.MAX_VALUE, false);
}

public abstract FileLock tryLock(long position, long size, boolean shared) throws IOException;

两种调用都可以只锁定文件部分内容(假如size设定到文件尾,新增的数据是默认不加锁,可以通过 Long.MAX_VALUE来锁所有字节),share 为false表示锁目的是读写,为true表示为共享锁,允许多进程读入,并阻止获取独占锁的行为。

释放锁和释放资源一样,都可以利用try-resource方式确保释放,参考try-catch,try-with-resource)。

文件加锁依赖操作系统,不同操作系统会有不同的设计和限制锁机制,文件锁归虚拟机所有,所以所有在同一虚拟机上启动的多个程序不可能同时获得同一个文件的锁。


第四节 一些源码**

源码-FileChannel**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package java.nio.channels;

public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel {

protected FileChannel() { }

public static FileChannel open(Path path,
Set<? extends OpenOption> options,
FileAttribute<?>... attrs) throws IOException {
FileSystemProvider provider = path.getFileSystem().provider();
return provider.newFileChannel(path, options, attrs);
}

@SuppressWarnings({"unchecked", "rawtypes"}) // generic array construction
private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];

public static FileChannel open(Path path, OpenOption... options) throws IOException {
Set<OpenOption> set = new HashSet<>(options.length);
Collections.addAll(set, options);
return open(path, set, NO_ATTRIBUTES);
}

// -- Channel operations --
public abstract int read(ByteBuffer dst) throws IOException;

public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

public final long read(ByteBuffer[] dsts) throws IOException {
return read(dsts, 0, dsts.length);
}

public abstract int write(ByteBuffer src) throws IOException;

public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

public final long write(ByteBuffer[] srcs) throws IOException {
return write(srcs, 0, srcs.length);
}


// -- Other operations --
public abstract long position() throws IOException;
public abstract FileChannel position(long newPosition) throws IOException;
public abstract long size() throws IOException;
public abstract FileChannel truncate(long size) throws IOException;
public abstract void force(boolean metaData) throws IOException;
public abstract long transferTo(long position, long count,
WritableByteChannel target) throws IOException;
public abstract long transferFrom(ReadableByteChannel src,
long position, long count) throws IOException;
public abstract int read(ByteBuffer dst, long position) throws IOException;
public abstract int write(ByteBuffer src, long position) throws IOException;

// -- Memory-mapped buffers --
public static class MapMode {
/**
* Mode for a read-only mapping.
*/
public static final MapMode READ_ONLY = new MapMode("READ_ONLY");

/**
* Mode for a read/write mapping.
*/
public static final MapMode READ_WRITE = new MapMode("READ_WRITE");

/**
* Mode for a private (copy-on-write) mapping.
*/
public static final MapMode PRIVATE = new MapMode("PRIVATE");

private final String name;

private MapMode(String name) {
this.name = name;
}

public String toString() {
return name;
}
}

public abstract MappedByteBuffer map(MapMode mode,
long position, long size) throws IOException;

// -- Locks --

public abstract FileLock lock(long position, long size, boolean shared) throws IOException;

public final FileLock lock() throws IOException {
return lock(0L, Long.MAX_VALUE, false);
}

public abstract FileLock tryLock(long position, long size, boolean shared) throws IOException;

public final FileLock tryLock() throws IOException {
return tryLock(0L, Long.MAX_VALUE, false);
}
}

源码-SocketChannel**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel {
protected SocketChannel(SelectorProvider provider) {
super(provider);
}

public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}

public static SocketChannel open(SocketAddress remote) throws IOException {
SocketChannel sc = open();
try {
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}

public final int validOps() {
return (SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
}

// -- Socket-specific operations --
@Override
public abstract SocketChannel bind(SocketAddress local) throws IOException;

@Override
public abstract <T> SocketChannel setOption(SocketOption<T> name, T value) throws IOException;

public abstract SocketChannel shutdownInput() throws IOException;
public abstract SocketChannel shutdownOutput() throws IOException;
public abstract Socket socket();
public abstract boolean isConnected();
public abstract boolean isConnectionPending();
public abstract boolean connect(SocketAddress remote) throws IOException;
public abstract boolean finishConnect() throws IOException;
public abstract SocketAddress getRemoteAddress() throws IOException;

// -- ByteChannel operations --

public abstract int read(ByteBuffer dst) throws IOException;
public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
public final long read(ByteBuffer[] dsts) throws IOException {
return read(dsts, 0, dsts.length);
}
public abstract int write(ByteBuffer src) throws IOException;
public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
public final long write(ByteBuffer[] srcs) throws IOException {
return write(srcs, 0, srcs.length);
}

@Override
public abstract SocketAddress getLocalAddress() throws IOException;
}

参考:

🔗《Java核心技术 卷Ⅱ》

🔗 理解IO多路复用

🔗 Java NIO 之 Selector 选择器

🔗 Java NIO Channel

🔗 Java NIO1:浅谈I/O模型

🔗 Java NIO2:NIO概述

🔗 Java NIO3:缓冲区Buffer

🔗 Java NIO4:缓冲区Buffer(续)

🔗 Java NIO5:通道和文件通道

🔗 Java NIO:IO与NIO的区别

🔗 Java NIO6:选择器1——理论篇

🔗 怎样理解阻塞非阻塞与同步异步的区别?萧萧

🔗 Java NIO浅析