线程实际应用

线程实际应用

一. 等待超时模式

1.1 场景

调用方法后需要等待一段时间,若时间内可以得到结果,则立即返回结果,否则超时返回默认结果。等待/通知模式无法做到超时等待,步骤为加锁、条件循环和处理逻辑。

1.2 内容

超时则需要添加一个时间段T,所以可以得到等待时间为T,超时的时间节点为nowTime+T。所以在原等待/通知模式上可以加入一个超时处理,在等待返回后判断是否超时即可。

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
// 对当前对象加锁
public synchronized Object get(long mills) throws InterruptedException{
long future = System.currentTimeMills() + mills;
long remaining = mills;
// 当超时大于0并且result返回值不满足要求
while ((result == null) && remaining > 0){
wait(remaining);
remaining = future - System.currentTimeMills();
}
return result;
}

即使方法执行时间很长,也不会”永久”阻塞调用者,而是会按时返回。

二. 数据库连接池

2.1 场景

模拟从连接池中获取、使用和释放连接的过程。而客户端获取连接的过程被设定为等待超时模式。所以在一定时间若无法获取到可用连接,就会返回给客户端null。

2.2 内容

线程连接池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 连接池:通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,
* 调用方需要先调用fetchConnection(long)方法来指定在多少毫秒内超时获取连接,
* 当连接使用完毕后,需要调用releaseConnection(Connection)方法将连接放回线程池。
*/
public class ConnectionPool {
/**
* 双向队列-维护连接
*/
private LinkedList<Connection> pool = new LinkedList<>();

public ConnectionPool(int initialSize){
if(initialSize > 0){
for(int i = 0;i < initialSize;i++){
pool.addLast(ConnectionDriver.createConnection());
}
}
}

public void releaseConnection(Connection connection){
if(connection != null){
synchronized (pool){
// 连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接
pool.addLast(connection);
pool.notifyAll();
}
}
}

// 在mills内无法获取到连接,将会返回null
public Connection fetchConnection(long mills)throws InterruptedException{
synchronized (pool){
// 完全超时
if(mills <= 0){
while (pool.isEmpty()){
pool.wait();
}
return pool.removeFirst();
}else {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while (pool.isEmpty() && remaining > 0){
pool.wait(remaining);
remaining = future - System.currentTimeMillis();
}
Connection result = null;
if(!pool.isEmpty()){
result = pool.removeFirst();
}
return result;
}
}
}
}

连接驱动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 通过动态代理实现了Connection接口,代理只实现了在commit方法调用时休眠100毫秒
*/
public class ConnectionDriver {

static class ConnectionHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(method.getName().equals("commit")){
TimeUnit.MICROSECONDS.sleep(100);
}
return null;
}
}

//创建一个Connection的代理,在commit时休眠100毫秒
public static final Connection createConnection(){
return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
new Class<?>[]{ Connection.class},new ConnectionHandler());
}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 模拟客户端ConnectionRunner获取、使用、最后释放连接的过程
*/
public class ConnectionPoolTest {
static ConnectionPool pool = new ConnectionPool(10);
// 保证所有ConnectionRunner能够同时开始
static CountDownLatch start = new CountDownLatch(1);
// main线程将会等待所有ConnectionRunner结束后才能继续执行
static CountDownLatch end;

public static void main(String[] args)throws Exception{
// 线程数量,可以修改线程数量进行观察
int threadCount = 10;
end = new CountDownLatch(threadCount);
int count = 20;
AtomicInteger got = new AtomicInteger();
AtomicInteger notGot = new AtomicInteger();
for(int i = 0;i < threadCount;i++){
Thread thread = new Thread(new ConnectionRunner(count,got,notGot),"ConnectionRunnerThread");
thread.start();
}
start.countDown();
end.await();
System.out.println("total invoke: " + (threadCount * count));
System.out.println("got connection: " + got);
System.out.println("not got connection " + notGot);
}

static class ConnectionRunner implements Runnable{
int count;
AtomicInteger got;
AtomicInteger notGot;

public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}

@Override
public void run() {
try {
// 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
// 分别统计连接获取的数量got和未获取到的数量notGot
start.await();
}catch (Exception ex){
ex.printStackTrace();
}
while (count > 0){
try {
//
Connection connection = pool.fetchConnection(1000);
if(connection != null){
try {
connection.createStatement();
connection.commit();
}finally {
pool.releaseConnection(connection);
got.incrementAndGet();
}
}else {
notGot.incrementAndGet();
}
}catch (Exception ex){
ex.printStackTrace();
}finally {
count--;
}
}
end.countDown();
}
}
}

执行结果如下。

1
2
3
total invoke: 200
got connection: 200
not got connection 0

上述事例中使用了CountDownLatch来确保ConnectionRunnerThread能够同时开始执行,并且在全部执行结束后,才使main线程从等待状态返回。当前设定的场景是10个线程同时运行获取连接池中的连接,通过调节线程数量来观察未获取到连接的情况。线程数、总获取次数、获取到的数量、未获取到的数量以及未获取到的比率,如表所示。

线程数量与连接获取的关系

从表中的数据统计可以看出,在资源一定的情况下(连接池中的10个连接),随着客户端线程的逐步增加,客户端出现超时无法获取连接的比率不断升高。虽然客户端线程在这种超时获取的模式下会出现连接无法获取的情况,但是它能够保证客户端线程不会一直挂在连接获取的操作上,而是“按时”返回,并告知客户端连接获取出现问题,是系统的一种自我保护机制。数据库连接池的设计也可以复用到其他的资源获取的场景,针对昂贵资源(比如数据库连接)的获取都应该加以超时限制。

三. 线程池技术及其示例

3.1 场景

对于服务端的程序,经常面对的是客户端传入的短小(执行时间短、工作内容较为单一)任务,需要服务端快速处理并返回结果。如果服务端每次接受到一个任务,创建一个线程,然后进行执行,这在原型阶段是个不错的选择,但是面对成千上万的任务递交进服务器时,如果还是采用一个任务一个线程的方式,那么将会创建数以万记的线程,这不是一个好的选择。因为这会使操作系统频繁的进行线程上下文切换,无故增加系统的负载,而线程的创建和消亡都是需要耗费系统资源的,也无疑浪费了系统资源。

线程池技术能够很好地解决这个问题,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面,面对过量任务的提交能够平缓的劣化。

3.2 内容

1
2
3
4
5
6
7
8
9
10
11
12
public interface ThreadPool<Job extends Runnable> {
// 执行一个Job,这个Job需要实现Runnable
void execute(Job job);
// 关闭线程池
void shutdown();
// 增加工作者线程
void addWorkers(int num);
// 减少工作者线程
void removeWorker(int num);
// 得到正在等待执行的任务数量
int getJobSize();
}

客户端可以通过 execute(Job) 方法将Job提交入线程池执行,而客户端自身不用等待Job的执行完成。除了 execute(Job) 方法以外,线程池接口提供了增大/减少工作者线程以及关闭线程池的方法。这里工作者线程代表着一个重复执行Job的线程,而每个由客户端提交的Job都将进入到一个工作队列中等待工作者线程的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
// 线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 10;
// 线程池默认的数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 线程池最小的数量
private static final int MIN_WORKER_NUMBERS = 1;
// 这是一个工作列表,将会向里面插入工作
private final LinkedList<Job> jobs = new LinkedList<>();
// 工作者列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
// 工作者线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
// 线程编号生成
private AtomicLong threadNum = new AtomicLong();

public DefaultThreadPool() {
initializeWokers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num) {
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
initializeWokers(workerNum);
}

public void execute(Job job) {
if (job != null) {
// 添加一个工作,然后进行通知
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}

public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}

public void addWorkers(int num) {
synchronized (jobs) {
// 限制新增的Worker数量不能超过最大值
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWokers(num);
this.workerNum += num;
}
}

public void removeWorker(int num) {
synchronized (jobs) {
if (num >= this.workerNum) {
throw new IllegalArgumentException("beyond workNum");
}
// 按照给定的数量停止Worker
int count = 0;
while (count < num) {
Worker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}

public int getJobSize() {
return jobs.size();
}

// 初始化线程工作者
private void initializeWokers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum. incrementAndGet());
thread.start();
}
}

// 工作者,负责消费任务
class Worker implements Runnable {
// 是否工作
private volatile boolean running = true;
public void run() {
while (running) {
Job job = null;
synchronized (jobs) {
// 如果工作者列表是空的,那么就wait
while (jobs.isEmpty()) {
try {
jobs.wait();
} catch (InterruptedException ex) {
// 感知到外部对WorkerThread的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
// 取出一个Job
job = jobs.removeFirst();
}
if (job != null) {
try {
job.run();
} catch (Exception ex) {
// 忽略Job执行中的Exception
}
}
}
}

public void shutdown() {
running = false;
}
}
}

从线程池的实现可以看到,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中 添加Job,而每个工作者线程会不断地从jobs上取出一个Job进行执行,当jobs为空时,工作者线程进入等待状态。

添加一个Job后,对工作队列jobs调用了其notify()方法,而不是notifyAll()方法,因为能够 确定有工作者线程被唤醒,这时使用notify()方法将会比notifyAll()方法获得更小的开销(避免将等待队列中的线程全部移动到阻塞队列中)。

可以看到,线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。

四. 一个基于线程池技术的简单Web服务器

4.1 场景

目前的浏览器都支持多线程访问,比如说在请求一个HTML页面的时候,页面中包含的图片资源、样式资源会被浏览器发起并发的获取,这样用户就不会遇到一直等到一个图片完全下载完成才能继续查看文字内容的尴尬情况。

如果Web服务器是单线程的,多线程的浏览器也没有用武之地,因为服务端还是一个请求 一个请求的顺序处理。因此,大部分Web服务器都是支持并发访问的。常用的Java Web服务器,如Tomcat、Jetty,在其处理请求的过程中都使用到了线程池技术。

4.2 内容

下面通过使用前一节中的线程池来构造一个简单的Web服务器,这个Web服务器用来处理 HTTP请求,目前只能处理简单的文本和JPG图片内容。这个Web服务器使用main线程不断地接 受客户端Socket的连接,将连接以及请求提交给线程池处理,这样使得Web服务器能够同时处 理多个客户端请求,示例如代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class SimpleHttpServer {
// 处理HttpRequest的线程池
static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1);
// SimpleHttpServer的根路径
static String basePath;
static ServerSocket serverSocket;
// 服务监听端口
static int port = 8080;
public static void setPort(int port) {
if (port > 0) {
SimpleHttpServer.port = port;
}
}

public static void setBasePath(String basePath) {
if (basePath != null && new File(basePath).exists() && new File(basePath).
isDirectory()) {
SimpleHttpServer.basePath = basePath;
}
}

// 启动SimpleHttpServer
public static void start() throws Exception {
serverSocket = new ServerSocket(port);
Socket socket = null;
while ((socket = serverSocket.accept()) != null) {
// 接收一个客户端Socket,生成一个HttpRequestHandler,放入线程池执行
threadPool.execute(new HttpRequestHandler(socket));
}
serverSocket.close();
}

static class HttpRequestHandler implements Runnable {
private Socket socket;
public HttpRequestHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
String line = null;
BufferedReader br = null;
BufferedReader reader = null;
PrintWriter out = null;
InputStream in = null;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String header = reader.readLine();
// 由相对路径计算出绝对路径
String filePath = basePath + header.split(" ")[1];
out = new PrintWriter(socket.getOutputStream());
// 如果请求资源的后缀为jpg或者ico,则读取资源并输出
if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
in = new FileInputStream(filePath);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int i = 0;
while ((i = in.read()) != -1) {
baos.write(i);
}
byte[] array = baos.toByteArray();
out.println("HTTP/1.1 200 OK");
out.println("Server: Molly");
out.println("Content-Type: image/jpeg");
out.println("Content-Length: " + array.length);
out.println("");
socket.getOutputStream().write(array, 0, array.length);
} else {
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
out = new PrintWriter(socket.getOutputStream());
out.println("HTTP/1.1 200 OK");
out.println("Server: Molly");
out.println("Content-Type: text/html; charset=UTF-8");
out.println("");
while ((line = br.readLine()) != null) {
out.println(line);
}
}
out.flush();
} catch (Exception ex) {
out.println("HTTP/1.1 500");
out.println("");
out.flush();
} finally {
close(br, in, reader, out, socket);
}
}
}

// 关闭流或者Socket
private static void close(Closeable... closeables) {
if (closeables != null) {
for (Closeable closeable : closeables) {
try {
closeable.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}

该Web服务器处理用户请求的时序图如图所示。

SimpleHttpServer时序图

在图中,SimpleHttpServer在建立了与客户端的连接之后,并不会处理客户端的请求,而是将其包装成HttpRequestHandler并交由线程池处理。在线程池中的Worker处理客户端请求的同时,SimpleHttpServer能够继续完成后续客户端连接的建立,不会阻塞后续客户端的请求。

接下来,通过一个测试对比来认识线程池技术带来服务器吞吐量的提高。我们准备了一个简单的HTML页面,内容如代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<html>    
<head>
<title>测试页面</title>
</head>

<body>
<h1>第一张图片</h1>
<img src="1.jpg" />
<h1>第二张图片</h1>
<img src="2.jpg" />
<h1>第三张图片</h1>
<img src="3.jpg" />
</body>
</html>

将SimpleHttpServer的根目录设定到该HTML页面所在目录,并启动SimpleHttpServer,通过Apache HTTP server benchmarking tool(版本2.3)来测试不同线程数下,SimpleHttpServer的吞吐量表现。

测试场景是5000次请求,分10个线程并发执行,测试内容主要考察响应时间(越小越好)和每秒查询的数量(越高越好),测试结果如表所示(机器CPU:i7-3635QM,内存为 8GB,实际输出可能与此表不同)。

测试结果

可以看到,随着线程池中线程数量的增加,SimpleHttpServer的吞吐量不断增大,响应时间不断变小,线程池的作用非常明显。

但是,线程池中线程数量并不是越多越好,具体的数量需要评估每个任务的处理时间,以及当前计算机的处理器能力和数量。使用的线程过少,无法发挥处理器的性能;使用的线程过多,将会增加系统的无故开销,起到相反的作用。

五. 小结

本章从介绍多线程技术带来的好处开始,讲述了如何启动和终止线程以及线程的状态,详细阐述了多线程之间进行通信的基本方式和等待/通知经典范式。在线程应用示例中,使用了等待超时、数据库连接池以及简单线程池3个不同的示例巩固本章前面章节所介绍的Java多线程基础知识。最后通过一个简单的Web服务器将上述知识点串联起来,加深我们对这些知识点的理解。


参考:

🔗《Java并发编程的艺术》