ZooKeeper(五)技术内幕-客户端

ZooKeeper(五)技术内幕

三. 客户端

客户端核心组件:

  • ZooKeeper实例:客户端入口。
  • ClientWatchManager:客户端Watcher管理器。
  • HostProvider:客户端地址列表管理器。
  • ClientCnxn:客户端核心线程,内部包含两个线程 SendThread 和EventThread ,前者是I/O线程负责客户端和服务端间的I/O通信,后者是事件线程负责对服务端事件进行处理。

构造函数API:

1
2
3
4
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);

客户端初始化和启动步骤:

  1. 设置默认Watcher。
  2. 设置ZooKeeper服务器地址列表、
  3. 创建ClientCnxn。

3.1 一次会话的创建过程

初始化阶段:

  1. 初始化ZK对象。

    通过调用ZK的构造函数实例化一个ZK对象,初始化过程创建一个客户端的Watcher管理器:ClientWatcherManager。

  2. 设置默认会话Watcher。

    如果在构造方法中传入了一个Watcher对象,客户端将其作为默认Watcher保存在ClientWatcherManager中。

  3. 构造ZK服务器地址列表管理器:HostProvider。

    客户端将构造函数传入的服务器地址存放到服务器地址列表管理器HostProvider中。

  4. 创建并初始化客户端网络连接器:ClientCnxn。

    首先创建一个网络连接器ClientCnxn,用来管理客户端和服务器的网络交互。同时还会初始化客户端的两个核心队列 outgoingQueue 和 pendingQueue ,分别作为客户端的请求发送队列和服务端响应的等待队列。

    同时还会创建ClientCnxn的底层I/O处理器ClientCnxnSocket。

  5. 初始化SendThread和EventThread。

    前者是I/O线程负责客户端和服务端间的I/O通信,后者是事件线程负责对服务端事件进行处理。

    同时将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事件。

会话创建阶段:

  1. 启动SendThread和EventThread。

    SendThread首先判断当前客户端状态,进行一系列请理性工作,为客户端发送“会话创建”做准备。

  2. 获取一个服务器地址。

    创建TCP连接前,SendThread首先需要获取一个ZK服务器的目标地址,通常从HostProvider随机取一个,委托给ClientCnxnSocket创建TCP连接。

  3. 创建TCP连接。

    ClientCnxnSocket负责和服务器创建一个TCP长连接。

  4. 构造ConnectRequest请求。

    TCP连接创建后,只是从网络TCP层面完成了客户端和服务端间的Socket连接,但还未完成会话创建。

    SendThread根据当前客户端的实际设置,构造出一个ConnectRequest请求,代表了客户端试图与服务器创建一个会话。

    同时ZK客户端进一步将请求包装成网络I/O层的Packet对象,放入请求发送队列 outgoingQueue 中去。

  5. 发送请求。

    客户端请求准备完毕,开始向服务端发送请求。ClientCnxnSocket从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。

响应处理阶段:

  1. 接收服务端响应。

    ClientCnxnSocket接收到服务端响应,首先判断当前客户端状态是否为“已初始化”,若尚未初始化完,认定该响应是会话创建请求的响应,直接交由 readConnectResult 方法来处理。

  2. 处理Response。

    ClientCnxnSocket对接收到的服务端响应进行反序列化,得到ConnectResponse对象,从中获取到ZK服务器分配的SessionId。

  3. 连接成功。

    连接成功,一方面通知SendThread线程,进一步对客户端进行会话参数设置,包括readTimeout和connectTimeout等,并更新客户端状态;

    另一方面,通知地址管理器HostProvider当前成功连接的服务器地址。

  4. 生成事件SyncConnected-None。

    SendThread生成事件SyncConnected-None让上层应用感知到会话成功创建,将其传递给EventThread线程。

  5. 查询Watcher。

    EventThread收到事件后,从ClientWatcherManager中查询出对应的Watcher,针对SyncConnected-None事件直接找出步骤2存储的默认Watcher,将其放到EventThread的waitingEvents队列。

  6. 处理事件。

    EventThread不断从waitingEvents队列中取出待处理的Watcher对象,直接调用此对象的process接口方法,以达到触发Watcher的目的。

3.2 服务器地址列表

ZK构造函数中的服务器地址列表参数connectString,通常是一个使用英文逗号分隔的多个IP和端口组成的字符串:

1
192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181

ZK客户端收到服务器地址列表后,首先将其放入ConnectStringParser对象封装起来:

1
2
3
4
public final class ConnectStringParser {
String chrootPath;
ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
}

(1)客户端隔离命名空间—Chroot

ZooKeeper 3.2.0版本引入Chroot特性,允许每个客户端为自己设置一个命名空间(Namespace),设置了Chroot的客户端对服务器的任何操作都会被限制在命名空间下

例如,我们要为应用X分配 /apps/X 下的所有子节点,应用可以将其所有ZK客户端的Chroot设置为 /apps/X 。对于客户端来说,节点路径都是以 /apps/X 为根节点,它和ZK发起的节点路径都是以这个根节点构建的相对路径。这种设计对于实现不同应用间的相互隔离很有帮助。

可以在connectString后添加后缀来设置Chroot,可以解析出Chroot保存在chrootPath属性:

1
192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181/apps/X

(2)地址列表管理器—HostProvider

ConnectStringParser会对服务器地址做简单处理,封装成InetSocketAddress对象,最后经过处理的地址列表会进一步的封装到StaticHostProvider中。

1
2
3
4
5
6
7
8
public interface HostProvider {

public int size();

public InetSocketAddress next(long spinDelay);

public void onConnected();
}

HostProvider三要素:

  • next()方法必须要有合法的返回值。

    凡是对该方法的调用,必须要返回一个合法的InetSocketAddress对象,不能为null或不合法对象。

  • next()方法必须返回已解析的InetSocketAddress对象。

    服务器地址列表保存在 ConnectStringParser.serverAddresses 中是没有被解析的InetSocketAddress,传递到HostProvider后才会被解析,不一定是在next()中解析,但next()返回的必须是已解析的InetSocketAddress对象。

  • size()方法不能返回0。

    HostProvider必须至少有一个服务器地址。

(3)StaticHostProvider

StaticHostProvider是HostProvider接口的默认实现,数据结构:

  • 解析服务器地址:ConnectStringParser.serverAddresses 中没有被解析的服务器地址逐个进行解析,再放到 serverAddresses 集合中。同时使用Collections工具类的shuffle方法来将其随机打散。

  • 获取可用的服务器地址:调用 next() 方法获取一个可用的服务器地址,其并非是简单的从集合中一次获取地址,而是将打散的地址列表先拼装成一个环形循环队列:

    服务器地址列表可能会较少,所以当两个游标值相同时,会进行spinDelay毫秒时间的等待。

自定义HostProvider

  • 配置文件方式:ZK默认实现中是传入服务器地址列表到构造函数,可以自定义实现一个HostProvider在应用启动时加载配置文件来实现地址列表的获取。
  • 动态变更的地址列表管理器:ZK集群的整体迁移或个别机器变更,会导致大批客户端应用也要一起变更,因为我们将IP地址写死在程序中。可以实现一个HostProvider能定时从DNS或配置管理中心上解析出ZK服务器地址列表,当地址列表变更时,同步更新到serverAddresses集合。
  • 实现同机房优先策略:项目规模扩大到一定程度,可能出现多机房或异地机房,如何解决不同机房之间的延时(通常可能会达到几十毫秒)?引入同机房优先策略,服务的消费者优先消费同一个机房的服务,实现一个HostProvider能优先和同机房的ZK服务器创建会话。

3.3 ClientCnxn-网络I/O

ClientCnxn负责客户端与服务端之间的网络连接并进行一系列网络通信。

(1)Packet

Packet是对协议层的封装,作为请求和响应的载体,ClientCnxn的静态内部类,结构:

Packet的createBB()方法负责对Packet对象进行序列化,最终生成用于网络传输的ByteBuffer对象。这个过程只会将 requestHeader、request和readOnly三个属性序列化,其余都保存在客户端的上下文中不会参与网络传输。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static class Packet {
RequestHeader requestHeader;
ReplyHeader replyHeader;
Record request;
Record response;
ByteBuffer bb;
String clientPath;
String serverPath;
boolean finished;
AsyncCallback cb;
Object ctx;
WatchRegistration watchRegistration;
public boolean readOnly;

......

public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len");
if (this.requestHeader != null) {
// requestHeader被序列化
this.requestHeader.serialize(boa, "header");
}

if (this.request instanceof ConnectRequest) {
// request和readOnly被序列化
this.request.serialize(boa, "connect");
boa.writeBool(this.readOnly, "readOnly");
} else if (this.request != null) {
// request被序列化
this.request.serialize(boa, "request");
}

baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException var3) {
ClientCnxn.LOG.warn("Ignoring unexpected exception", var3);
}
}

......
}

(2)outgoingQueue和pendingQueue

  • outgoingQueue:客户端请求发送队列,存储那些需要发送到服务端的Packet集合。
  • pendingQueue:服务端响应等待队列,存储已经从客户端发送到服务端,但需要等待响应的Packet集合。
1
2
3
4
5
6
7
8
9
10
@SuppressFBWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class ClientCnxn {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);
private static final int SET_WATCHES_MAX_LENGTH = 131072;
private static boolean disableAutoWatchReset = Boolean.getBoolean("zookeeper.disableAutoWatchReset");
private final CopyOnWriteArraySet<ClientCnxn.AuthData> authInfo;
// 链表结构
private final LinkedList<ClientCnxn.Packet> pendingQueue;
private final LinkedList<ClientCnxn.Packet> outgoingQueue;
......

(3)底层Socket通信层—ClientCnxnSocket

3.4.0版本后,从ClientCnxn中抽离出ClientCnxnSocket(抽象类),使客户端代码结构更清晰的同时也便于对底层Socket通信层进行扩展(如使用Netty实现)。

可以通过在 zookeeper.clientCnxnSocket 这个系统变量中配置 ClientCnxnSocket 实现类的全类名,来指定自定义实现:

1
2
# ZK默认实现即ClientCnxnSocketNIO,使用Java原生的NIO接口,核心是doIO逻辑,负责对请求的发生和响应接收过程
-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNIO
  • 请求发送:

    • 从outgoingQueue队列提取出一个可发送的Packet对象,同时生成一个客户端请求序号XID并将其设置到Packet请求头中,再将其序列化后发送。
    • 可发送是指,检测到客户端与服务端之间正在处理SASL权限时,不包含请求头(requestHeader)的Packet(如会话创建请求)可以被发送,其余都无法发送。
    • 发送完毕,会立即将Packet保存到pendingQueue队列。
  • 响应接收:客户端收到服务端的完整响应数据后,不同的客户端请求类型会进行不同的处理

    • 如果检测到当前客户端尚未进行初始化,说明当前客户端与服务端之间正在进行会话创建,直接将接收到的ByteBuffer(incomingBuffer)序列化为ConnectResponse对象。
    • 如果当前客户端已经处于正常的会话周期,且接收到的服务端响应是一个事件,将接收到的ByteBuffer(incomingBuffer)序列化为WatcherEvent对象,并将其放入待处理队列中。
    • 如果是一个常规的请求响应(Create、GetData和Exist等操作请求),会从pendingQueue队列中取出一个Packet来进行相应的处理。客户端首先通过检验服务端响应中包含的XID值来确保请求处理的顺序性,然后再将接收到的ByteBuffer(incomingBuffer)序列化为相应的Response对象。

    最终会在 ClientCnxn.finishPacket 方法中处理 Watcher 注册等逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void finishPacket(ClientCnxn.Packet p) {
if (p.watchRegistration != null) {
// 错误码不为0则将Watcher添加到clientPath为key的数组
p.watchRegistration.register(p.replyHeader.getErr());
}

if (p.cb == null) {
// 没有回调函数,更新Packet为终止,唤醒等待的线程
synchronized(p) {
p.finished = true;
p.notifyAll();
}
} else {
// 有回调函数,更新Packet为终止,增加到事件队列
p.finished = true;
this.eventThread.queuePacket(p);
}
}

抽象类ClientCnxnSocket定义了一些抽象方法,和readConnectResult的默认实现等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
abstract class ClientCnxnSocket {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class);
protected boolean initialized;
protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
protected ByteBuffer incomingBuffer;
protected long sentCount;
protected long recvCount;
protected long lastHeard;
protected long lastSend;
protected long now;
protected SendThread sendThread;
protected long sessionId;

ClientCnxnSocket() {
this.incomingBuffer = this.lenBuffer;
this.sentCount = 0L;
this.recvCount = 0L;
}

void introduce(SendThread sendThread, long sessionId) {
this.sendThread = sendThread;
this.sessionId = sessionId;
}

void updateNow() {
this.now = Time.currentElapsedTime();
}

int getIdleRecv() {
return (int)(this.now - this.lastHeard);
}

int getIdleSend() {
return (int)(this.now - this.lastSend);
}

long getSentCount() {
return this.sentCount;
}

long getRecvCount() {
return this.recvCount;
}

void updateLastHeard() {
this.lastHeard = this.now;
}

void updateLastSend() {
this.lastSend = this.now;
}

void updateLastSendAndHeard() {
this.lastSend = this.now;
this.lastHeard = this.now;
}

protected void readLength() throws IOException {
int len = this.incomingBuffer.getInt();
if (len >= 0 && len < ClientCnxn.packetLen) {
this.incomingBuffer = ByteBuffer.allocate(len);
} else {
throw new IOException("Packet len" + len + " is out of range!");
}
}

void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
byte[] var2 = this.incomingBuffer.array();
int var3 = var2.length;

for(int var4 = 0; var4 < var3; ++var4) {
byte b = var2[var4];
buf.append(Integer.toHexString(b) + ",");
}

buf.append("]");
LOG.trace("readConnectResult " + this.incomingBuffer.remaining() + " " + buf.toString());
}

ByteBufferInputStream bbis = new ByteBufferInputStream(this.incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
boolean isRO = false;

try {
isRO = bbia.readBool("readOnly");
} catch (IOException var6) {
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}

this.sessionId = conRsp.getSessionId();
this.sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}

abstract boolean isConnected();

abstract void connect(InetSocketAddress var1) throws IOException;

abstract SocketAddress getRemoteSocketAddress();

abstract SocketAddress getLocalSocketAddress();

abstract void cleanup();

abstract void close();

abstract void wakeupCnxn();

abstract void enableWrite();

abstract void disableWrite();

abstract void enableReadWriteOnly();

abstract void doTransport(int var1, List<Packet> var2, LinkedList<Packet> var3, ClientCnxn var4) throws IOException, InterruptedException;

abstract void testableCloseSocket() throws IOException;

abstract void sendPacket(Packet var1) throws IOException;
}

默认实现ClientCnxnSocketNIO:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNIO.class);
private final Selector selector = Selector.open();
private SelectionKey sockKey;

ClientCnxnSocketNIO() throws IOException {
}

boolean isConnected() {
return this.sockKey != null;
}

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel)this.sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
} else {
if (this.sockKey.isReadable()) {
int rc = sock.read(this.incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(this.sessionId) + ", likely server has closed socket");
}

if (!this.incomingBuffer.hasRemaining()) {
this.incomingBuffer.flip();
if (this.incomingBuffer == this.lenBuffer) {
++this.recvCount;
this.readLength();
} else if (!this.initialized) {
this.readConnectResult();
this.enableRead();
if (this.findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
this.enableWrite();
}

this.lenBuffer.clear();
this.incomingBuffer = this.lenBuffer;
this.updateLastHeard();
this.initialized = true;
} else {
this.sendThread.readResponse(this.incomingBuffer);
this.lenBuffer.clear();
this.incomingBuffer = this.lenBuffer;
this.updateLastHeard();
}
}
}

if (this.sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = this.findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
this.updateLastSend();
if (p.bb == null) {
if (p.requestHeader != null && p.requestHeader.getType() != 11 && p.requestHeader.getType() != 100) {
p.requestHeader.setXid(cnxn.getXid());
}

p.createBB();
}

sock.write(p.bb);
if (!p.bb.hasRemaining()) {
++this.sentCount;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null && p.requestHeader.getType() != 11 && p.requestHeader.getType() != 100) {
synchronized(pendingQueue) {
pendingQueue.add(p);
}
}
}
}

if (outgoingQueue.isEmpty()) {
this.disableWrite();
} else if (!this.initialized && p != null && !p.bb.hasRemaining()) {
this.disableWrite();
} else {
this.enableWrite();
}
}
}

}
}

private Packet findSendablePacket(LinkedList<Packet> outgoingQueue, boolean clientTunneledAuthenticationInProgress) {
synchronized(outgoingQueue) {
if (outgoingQueue.isEmpty()) {
return null;
} else if (((Packet)outgoingQueue.getFirst()).bb == null && clientTunneledAuthenticationInProgress) {
ListIterator iter = outgoingQueue.listIterator();

while(iter.hasNext()) {
Packet p = (Packet)iter.next();
if (p.requestHeader == null) {
iter.remove();
outgoingQueue.add(0, p);
return p;
}

if (LOG.isDebugEnabled()) {
LOG.debug("deferring non-priming packet: " + p + "until SASL authentication completes.");
}
}

return null;
} else {
return (Packet)outgoingQueue.getFirst();
}
}
}

void cleanup() {
if (this.sockKey != null) {
SocketChannel sock = (SocketChannel)this.sockKey.channel();
this.sockKey.cancel();

try {
sock.socket().shutdownInput();
} catch (IOException var7) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring exception during shutdown input", var7);
}
}

try {
sock.socket().shutdownOutput();
} catch (IOException var6) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring exception during shutdown output", var6);
}
}

try {
sock.socket().close();
} catch (IOException var5) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring exception during socket close", var5);
}
}

try {
sock.close();
} catch (IOException var4) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring exception during channel close", var4);
}
}
}

try {
Thread.sleep(100L);
} catch (InterruptedException var3) {
if (LOG.isDebugEnabled()) {
LOG.debug("SendThread interrupted during sleep, ignoring");
}
}

this.sockKey = null;
}

void close() {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Doing client selector close");
}

this.selector.close();
if (LOG.isTraceEnabled()) {
LOG.trace("Closed client selector");
}
} catch (IOException var2) {
LOG.warn("Ignoring exception during selector close", var2);
}

}

SocketChannel createSock() throws IOException {
SocketChannel sock = SocketChannel.open();
sock.configureBlocking(false);
sock.socket().setSoLinger(false, -1);
sock.socket().setTcpNoDelay(true);
return sock;
}

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
this.sockKey = sock.register(this.selector, 8);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
this.sendThread.primeConnection();
}

}

void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = this.createSock();

try {
this.registerAndConnect(sock, addr);
} catch (IOException var4) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw var4;
}

this.initialized = false;
this.lenBuffer.clear();
this.incomingBuffer = this.lenBuffer;
}

SocketAddress getRemoteSocketAddress() {
try {
return ((SocketChannel)this.sockKey.channel()).socket().getRemoteSocketAddress();
} catch (NullPointerException var2) {
return null;
}
}

SocketAddress getLocalSocketAddress() {
try {
return ((SocketChannel)this.sockKey.channel()).socket().getLocalSocketAddress();
} catch (NullPointerException var2) {
return null;
}
}

synchronized void wakeupCnxn() {
this.selector.wakeup();
}

void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {
this.selector.select((long)waitTimeOut);
Set selected;
synchronized(this) {
selected = this.selector.selectedKeys();
}

this.updateNow();
Iterator var6 = selected.iterator();

while(var6.hasNext()) {
SelectionKey k = (SelectionKey)var6.next();
SocketChannel sc = (SocketChannel)k.channel();
if ((k.readyOps() & 8) != 0) {
if (sc.finishConnect()) {
this.updateLastSendAndHeard();
this.sendThread.primeConnection();
}
} else if ((k.readyOps() & 5) != 0) {
this.doIO(pendingQueue, outgoingQueue, cnxn);
}
}

if (this.sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (this.findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
this.enableWrite();
}
}
}

selected.clear();
}

void testableCloseSocket() throws IOException {
LOG.info("testableCloseSocket() called");
((SocketChannel)this.sockKey.channel()).socket().close();
}

synchronized void enableWrite() {
int i = this.sockKey.interestOps();
if ((i & 4) == 0) {
this.sockKey.interestOps(i | 4);
}

}

public synchronized void disableWrite() {
int i = this.sockKey.interestOps();
if ((i & 4) != 0) {
this.sockKey.interestOps(i & -5);
}

}

private synchronized void enableRead() {
int i = this.sockKey.interestOps();
if ((i & 1) == 0) {
this.sockKey.interestOps(i | 1);
}

}

synchronized void enableReadWriteOnly() {
this.sockKey.interestOps(5);
}

Selector getSelector() {
return this.selector;
}

void sendPacket(Packet p) throws IOException {
SocketChannel sock = (SocketChannel)this.sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
} else {
p.createBB();
ByteBuffer pbb = p.bb;
sock.write(pbb);
}
}
}

(4)SendThread

SendThread是客户端ClientCnxn内部一个核心的I/O调度线程,维护了客户端与服务端之间的会话生命周期,通过在一定的周期频率内向服务端发送一个PING包来实现心跳检测。会话周期内一旦TCP连接断开,就会自动且透明化的完成重连操作。

管理了客户端所有请求发送和响应接收操作,将上层客户端API操作转换为相应的请求协议并发送到服务端,完成对同步调用的返回和异步调用的回调。还负责将来自服务端的事件传递给EventThread处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
class SendThread extends ZooKeeperThread {
private long lastPingSentNs;
private final ClientCnxnSocket clientCnxnSocket;
private Random r = new Random(System.nanoTime());
private boolean isFirstConnect = true;
private InetSocketAddress rwServerAddress = null;
private static final int minPingRwTimeout = 100;
private static final int maxPingRwTimeout = 60000;
private int pingRwTimeout = 100;
private boolean saslLoginFailed = false;
private static final String RETRY_CONN_MSG = ", closing socket connection and attempting reconnect";

// 读取响应信息
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
// 请求和响应序号为负值的几种特殊情况,-1为通知,-2为ping请求,-4权限
if (replyHdr.getXid() == -2) {
if (ClientCnxn.LOG.isDebugEnabled()) {
ClientCnxn.LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(ClientCnxn.this.sessionId) + " after " + (System.nanoTime() - this.lastPingSentNs) / 1000000L + "ms");
}

} else if (replyHdr.getXid() == -4) {
if (replyHdr.getErr() == Code.AUTHFAILED.intValue()) {
ClientCnxn.this.state = States.AUTH_FAILED;
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
}

if (ClientCnxn.LOG.isDebugEnabled()) {
ClientCnxn.LOG.debug("Got auth sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
}

} else if (replyHdr.getXid() == -1) {
if (ClientCnxn.LOG.isDebugEnabled()) {
ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
}

// 创建监听事件,将响应内容反序列化
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// 如果相对路径不为空,处理下路径
if (ClientCnxn.this.chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) {
event.setPath("/");
} else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) {
event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
} else {
ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
}
}

// WatcherEvent再封装为WatchedEvent,并加入eventThread队列
WatchedEvent we = new WatchedEvent(event);
if (ClientCnxn.LOG.isDebugEnabled()) {
ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
}

ClientCnxn.this.eventThread.queueEvent(we);
} else if (this.clientTunneledAuthenticationInProgress()) {
// 开启SASL
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
ClientCnxn.this.zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
} else {
// 非特殊情况
ClientCnxn.Packet packet;
// 锁住响应队列,移出Packet
synchronized(ClientCnxn.this.pendingQueue) {
if (ClientCnxn.this.pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}

packet = (ClientCnxn.Packet)ClientCnxn.this.pendingQueue.remove();
}

try {
// 响应序号和请求序号不匹配,抛出异常
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet);
}

// 将响应反序列化出的header信息塞入Packet对象
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0L) {
ClientCnxn.this.lastZxid = replyHdr.getZxid();
}

// response反序列化塞入Packet对象
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}

if (ClientCnxn.LOG.isDebugEnabled()) {
ClientCnxn.LOG.debug("Reading reply sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId) + ", packet:: " + packet);
}
} finally {
// 最终注册下Watch等
ClientCnxn.this.finishPacket(packet);
}

}
}

SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}

States getZkState() {
return ClientCnxn.this.state;
}

ClientCnxnSocket getClientCnxnSocket() {
return this.clientCnxnSocket;
}

void primeConnection() throws IOException {
ClientCnxn.LOG.info("Socket connection established to " + this.clientCnxnSocket.getRemoteSocketAddress() + ", initiating session");
this.isFirstConnect = false;
long sessId = ClientCnxn.this.seenRwServerBefore ? ClientCnxn.this.sessionId : 0L;
ConnectRequest conReq = new ConnectRequest(0, ClientCnxn.this.lastZxid, ClientCnxn.this.sessionTimeout, sessId, ClientCnxn.this.sessionPasswd);
synchronized(ClientCnxn.this.outgoingQueue) {
if (!ClientCnxn.disableAutoWatchReset) {
List<String> dataWatches = ClientCnxn.this.zooKeeper.getDataWatches();
List<String> existWatches = ClientCnxn.this.zooKeeper.getExistWatches();
List<String> childWatches = ClientCnxn.this.zooKeeper.getChildWatches();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = this.prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = this.prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = this.prependChroot(childWatches).iterator();
long setWatchesLastZxid = ClientCnxn.this.lastZxid;

while(dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList();
List<String> existWatchesBatch = new ArrayList();
List<String> childWatchesBatch = new ArrayList();

String watch;
for(int batchLength = 0; batchLength < 131072; batchLength += watch.length()) {
if (dataWatchesIter.hasNext()) {
watch = (String)dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = (String)existWatchesIter.next();
existWatchesBatch.add(watch);
} else {
if (!childWatchesIter.hasNext()) {
break;
}

watch = (String)childWatchesIter.next();
childWatchesBatch.add(watch);
}
}

SetWatches sw = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(101);
h.setXid(-8);
ClientCnxn.Packet packet = new ClientCnxn.Packet(h, new ReplyHeader(), sw, (Record)null, (WatchRegistration)null);
ClientCnxn.this.outgoingQueue.addFirst(packet);
}
}
}

Iterator var22 = ClientCnxn.this.authInfo.iterator();

while(true) {
if (!var22.hasNext()) {
ClientCnxn.this.outgoingQueue.addFirst(new ClientCnxn.Packet((RequestHeader)null, (ReplyHeader)null, conReq, (Record)null, (WatchRegistration)null, ClientCnxn.this.readOnly));
break;
}

ClientCnxn.AuthData id = (ClientCnxn.AuthData)var22.next();
ClientCnxn.this.outgoingQueue.addFirst(new ClientCnxn.Packet(new RequestHeader(-4, 100), (ReplyHeader)null, new AuthPacket(0, id.scheme, id.data), (Record)null, (WatchRegistration)null));
}
}

this.clientCnxnSocket.enableReadWriteOnly();
if (ClientCnxn.LOG.isDebugEnabled()) {
ClientCnxn.LOG.debug("Session establishment request sent on " + this.clientCnxnSocket.getRemoteSocketAddress());
}

}

private List<String> prependChroot(List<String> paths) {
if (ClientCnxn.this.chrootPath != null && !paths.isEmpty()) {
for(int i = 0; i < paths.size(); ++i) {
String clientPath = (String)paths.get(i);
String serverPath;
if (clientPath.length() == 1) {
serverPath = ClientCnxn.this.chrootPath;
} else {
serverPath = ClientCnxn.this.chrootPath + clientPath;
}

paths.set(i, serverPath);
}
}

return paths;
}

// run()调用
private void sendPing() {
this.lastPingSentNs = System.nanoTime();
// 序号固定为-2
RequestHeader h = new RequestHeader(-2, 11);
ClientCnxn.this.queuePacket(h, (ReplyHeader)null, (Record)null, (Record)null, (AsyncCallback)null, (String)null, (String)null, (Object)null, (WatchRegistration)null);
}

// 开始连接,run()调用
private void startConnect(InetSocketAddress addr) throws IOException {
this.saslLoginFailed = false;
ClientCnxn.this.state = States.CONNECTING;
this.setName(this.getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
if (ZooKeeperSaslClient.isEnabled()) {
// SASL开启时,实例化客户端
try {
ClientCnxn.this.zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr));
} catch (LoginException var3) {
ClientCnxn.LOG.warn("SASL configuration failed: " + var3 + " Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.");
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
this.saslLoginFailed = true;
}
}

this.logStartConnect(addr);
// 注册并连接Socket
this.clientCnxnSocket.connect(addr);
}

private void logStartConnect(InetSocketAddress addr) {
String msg = "Opening socket connection to server " + addr;
if (ClientCnxn.this.zooKeeperSaslClient != null) {
msg = msg + ". " + ClientCnxn.this.zooKeeperSaslClient.getConfigStatus();
}

ClientCnxn.LOG.info(msg);
}

public void run() {
// ClientCnxnSocket构建会话和Thread,更新几个参数属性
this.clientCnxnSocket.introduce(this, ClientCnxn.this.sessionId);
this.clientCnxnSocket.updateNow();
this.clientCnxnSocket.updateLastSendAndHeard();
long lastPingRwServer = Time.currentElapsedTime();
int MAX_SEND_PING_INTERVAL = true;
InetSocketAddress serverAddress = null;

// 状态存活一直循环
while(ClientCnxn.this.state.isAlive()) {
try {
// Socket还未连接上,则更具
if (!this.clientCnxnSocket.isConnected()) {
// 当前非首次连接就休眠等待(0到1000的随机数),等待连接建立
if (!this.isFirstConnect) {
try {
Thread.sleep((long)this.r.nextInt(1000));
} catch (InterruptedException var10) {
ClientCnxn.LOG.warn("Unexpected exception", var10);
}
}

// 如果已经结束则跳出循环
if (ClientCnxn.this.closing || !ClientCnxn.this.state.isAlive()) {
break;
}

// 获取服务器地址
if (this.rwServerAddress != null) {
serverAddress = this.rwServerAddress;
this.rwServerAddress = null;
} else {
serverAddress = ClientCnxn.this.hostProvider.next(1000L);
}

// 调用startConnect建立连接
this.startConnect(serverAddress);
this.clientCnxnSocket.updateLastSendAndHeard();
}

int to;
if (ClientCnxn.this.state.isConnected()) {
// 当前已连接
if (ClientCnxn.this.zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (ClientCnxn.this.zooKeeperSaslClient.getSaslState() == SaslState.INITIAL) {
try {
ClientCnxn.this.zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException var9) {
ClientCnxn.LOG.error("SASL authentication with Zookeeper Quorum member failed: " + var9);
ClientCnxn.this.state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}

KeeperState authState = ClientCnxn.this.zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
ClientCnxn.this.state = States.AUTH_FAILED;
sendAuthEvent = true;
} else if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}

if (sendAuthEvent) {
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, authState, (String)null));
}
}

to = ClientCnxn.this.readTimeout - this.clientCnxnSocket.getIdleRecv();
} else {
// 当前还未连接
to = ClientCnxn.this.connectTimeout - this.clientCnxnSocket.getIdleRecv();
}

// 会话超时
if (to <= 0) {
String warnInfo = "Client session timed out, have not heard from server in " + this.clientCnxnSocket.getIdleRecv() + "ms for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId);
ClientCnxn.LOG.warn(warnInfo);
throw new ClientCnxn.SessionTimeoutException(warnInfo);
}

// 已连接则定时发送Ping
if (ClientCnxn.this.state.isConnected()) {
int timeToNextPing = ClientCnxn.this.readTimeout / 2 - this.clientCnxnSocket.getIdleSend() - (this.clientCnxnSocket.getIdleSend() > 1000 ? 1000 : 0);
if (timeToNextPing > 0 && this.clientCnxnSocket.getIdleSend() <= 10000) {
if (timeToNextPing < to) {
to = timeToNextPing;
}
} else {
this.sendPing();
this.clientCnxnSocket.updateLastSend();
}
}

// CONNECTED_READ_ONLY状态,更新to
if (ClientCnxn.this.state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int)(now - lastPingRwServer);
if (idlePingRwServer >= this.pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
this.pingRwTimeout = Math.min(2 * this.pingRwTimeout, 60000);
this.pingRwServer();
}

to = Math.min(to, this.pingRwTimeout - idlePingRwServer);
}

// 调用底层传输接口
this.clientCnxnSocket.doTransport(to, ClientCnxn.this.pendingQueue, ClientCnxn.this.outgoingQueue, ClientCnxn.this);
} catch (Throwable var11) {
if (ClientCnxn.this.closing) {
if (ClientCnxn.LOG.isDebugEnabled()) {
ClientCnxn.LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(ClientCnxn.this.getSessionId()) + " : " + var11.getMessage());
}
break;
}

if (var11 instanceof ClientCnxn.SessionExpiredException) {
ClientCnxn.LOG.info(var11.getMessage() + ", closing socket connection");
} else if (var11 instanceof ClientCnxn.SessionTimeoutException) {
ClientCnxn.LOG.info(var11.getMessage() + ", closing socket connection and attempting reconnect");
} else if (var11 instanceof ClientCnxn.EndOfStreamException) {
ClientCnxn.LOG.info(var11.getMessage() + ", closing socket connection and attempting reconnect");
} else if (var11 instanceof ClientCnxn.RWServerFoundException) {
ClientCnxn.LOG.info(var11.getMessage());
} else if (var11 instanceof SocketException) {
ClientCnxn.LOG.info("Socket error occurred: {}: {}", serverAddress, var11.getMessage());
} else {
ClientCnxn.LOG.warn("Session 0x{} for server {}, unexpected error{}", new Object[]{Long.toHexString(ClientCnxn.this.getSessionId()), serverAddress, ", closing socket connection and attempting reconnect", var11});
}

this.cleanup();
if (ClientCnxn.this.state.isAlive()) {
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.Disconnected, (String)null));
}

this.clientCnxnSocket.updateNow();
this.clientCnxnSocket.updateLastSendAndHeard();
}
}

// 循环结束,终止状态
this.cleanup();
this.clientCnxnSocket.close();
// 若此时状态还是存活,发送断开连接事件
if (ClientCnxn.this.state.isAlive()) {
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.Disconnected, (String)null));
}

ZooTrace.logTraceMessage(ClientCnxn.LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(ClientCnxn.this.getSessionId()));
}

private void pingRwServer() throws ClientCnxn.RWServerFoundException, UnknownHostException {
String result = null;
InetSocketAddress addr = ClientCnxn.this.hostProvider.next(0L);
ClientCnxn.LOG.info("Checking server " + addr + " for being r/w. Timeout " + this.pingRwTimeout);
Socket sock = null;
BufferedReader br = null;

try {
sock = new Socket(addr.getHostName(), addr.getPort());
sock.setSoLinger(false, -1);
sock.setSoTimeout(1000);
sock.setTcpNoDelay(true);
sock.getOutputStream().write("isro".getBytes());
sock.getOutputStream().flush();
sock.shutdownOutput();
br = new BufferedReader(new InputStreamReader(sock.getInputStream()));
result = br.readLine();
} catch (ConnectException var21) {
} catch (IOException var22) {
ClientCnxn.LOG.warn("Exception while seeking for r/w server " + var22.getMessage(), var22);
} finally {
if (sock != null) {
try {
sock.close();
} catch (IOException var20) {
ClientCnxn.LOG.warn("Unexpected exception", var20);
}
}

if (br != null) {
try {
br.close();
} catch (IOException var19) {
ClientCnxn.LOG.warn("Unexpected exception", var19);
}
}

}

if ("rw".equals(result)) {
this.pingRwTimeout = 100;
this.rwServerAddress = addr;
throw new ClientCnxn.RWServerFoundException("Majority server found at " + addr.getHostName() + ":" + addr.getPort());
}
}

private void cleanup() {
this.clientCnxnSocket.cleanup();
Iterator var2;
ClientCnxn.Packet p;
synchronized(ClientCnxn.this.pendingQueue) {
var2 = ClientCnxn.this.pendingQueue.iterator();

while(true) {
if (!var2.hasNext()) {
ClientCnxn.this.pendingQueue.clear();
break;
}

p = (ClientCnxn.Packet)var2.next();
ClientCnxn.this.conLossPacket(p);
}
}

synchronized(ClientCnxn.this.outgoingQueue) {
var2 = ClientCnxn.this.outgoingQueue.iterator();

while(var2.hasNext()) {
p = (ClientCnxn.Packet)var2.next();
ClientCnxn.this.conLossPacket(p);
}

ClientCnxn.this.outgoingQueue.clear();
}
}

void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException {
ClientCnxn.this.negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (ClientCnxn.this.negotiatedSessionTimeout <= 0) {
ClientCnxn.this.state = States.CLOSED;
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.Expired, (String)null));
ClientCnxn.this.eventThread.queueEventOfDeath();
String warnInfo = "Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(ClientCnxn.this.sessionId) + " has expired";
ClientCnxn.LOG.warn(warnInfo);
throw new ClientCnxn.SessionExpiredException(warnInfo);
} else {
if (!ClientCnxn.this.readOnly && isRO) {
ClientCnxn.LOG.error("Read/write client got connected to read-only server");
}

ClientCnxn.this.readTimeout = ClientCnxn.this.negotiatedSessionTimeout * 2 / 3;
ClientCnxn.this.connectTimeout = ClientCnxn.this.negotiatedSessionTimeout / ClientCnxn.this.hostProvider.size();
ClientCnxn.this.hostProvider.onConnected();
ClientCnxn.this.sessionId = _sessionId;
ClientCnxn.this.sessionPasswd = _sessionPasswd;
ClientCnxn.this.state = isRO ? States.CONNECTEDREADONLY : States.CONNECTED;
ClientCnxn var10000 = ClientCnxn.this;
var10000.seenRwServerBefore |= !isRO;
ClientCnxn.LOG.info("Session establishment complete on server " + this.clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x" + Long.toHexString(ClientCnxn.this.sessionId) + ", negotiated timeout = " + ClientCnxn.this.negotiatedSessionTimeout + (isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = isRO ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, eventState, (String)null));
}
}

void close() {
ClientCnxn.this.state = States.CLOSED;
this.clientCnxnSocket.wakeupCnxn();
}

void testableCloseSocket() throws IOException {
this.clientCnxnSocket.testableCloseSocket();
}

public boolean clientTunneledAuthenticationInProgress() {
if (!ZooKeeperSaslClient.isEnabled()) {
return false;
} else if (this.saslLoginFailed) {
return false;
} else {
return ClientCnxn.this.zooKeeperSaslClient == null ? true : ClientCnxn.this.zooKeeperSaslClient.clientTunneledAuthenticationInProgress();
}
}

public void sendPacket(ClientCnxn.Packet p) throws IOException {
this.clientCnxnSocket.sendPacket(p);
}
}

(5)EventThread

EventThread是ClientCnxn的内部类,负责客户端的事件处理,并触发客户端注册的Watcher监听。EventThread中有一个waitingEvents队列,用于临时存放那些需要被触发的Object,包括客户端注册的Watcher和异步接口中注册的回调器 AsyncCallback。

EventThread会不断从waitingEvents中取出Object,识别出其具体类型,并分别调用process和processResult接口方法来实现对事件的触发和回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// EventThread线程不断的从waitingEvents这个队列中取出Object,识别出其具体类型Watcher或者AsyncCallback,并分别调用process和processResult接口方法来实现对事件的触发和回调。
// watcher就是数据变更通知
// AsyncCallback是ZooKeeper客户端的API命令中的异步API,ZooKeeper客户端的API,创建节点、删除节点、检测节点是否存在、权限控制、获取子节点、获取数据内容、设置权限、设置数据内容都有相应的异步方式,有StringCallback、VoidCallback、StatCallback、ACLCallback、ChildrenCallback、Children2Callback、DataCallback七种
class EventThread extends ZooKeeperThread {
//队列
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue();
//这实际上是队列中的会话状态,直到EventThread事件线程实际处理该事件并将其交给watcher为止。
private volatile KeeperState sessionState;
private volatile boolean wasKilled;
private volatile boolean isRunning;

EventThread() {
super(ClientCnxn.makeThreadName("-EventThread"));
this.sessionState = KeeperState.Disconnected;
this.wasKilled = false;
this.isRunning = false;
this.setDaemon(true);
}
//将WatchedEvent加入到waitingEvents队列中
//在SendThread线程的readResponse(ByteBuffer incomingBuffer)读取从服务器端的response
//如果是事件,则解析反序列化还原成WatchedEvent,调用该方法
public void queueEvent(WatchedEvent event) {
if (event.getType() != EventType.None || this.sessionState != event.getState()) {
this.sessionState = event.getState();
ClientCnxn.WatcherSetEventPair pair = new ClientCnxn.WatcherSetEventPair(ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()), event);
this.waitingEvents.add(pair);
}
}

@SuppressFBWarnings({"JLM_JSR166_UTILCONCURRENT_MONITORENTER"})
public void queuePacket(ClientCnxn.Packet packet) {
if (this.wasKilled) {
synchronized(this.waitingEvents) {
if (this.isRunning) {
this.waitingEvents.add(packet);
} else {
this.processEvent(packet);
}
}
} else {
this.waitingEvents.add(packet);
}

}

public void queueEventOfDeath() {
this.waitingEvents.add(ClientCnxn.this.eventOfDeath);
}

@SuppressFBWarnings({"JLM_JSR166_UTILCONCURRENT_MONITORENTER"})
public void run() {
try {
this.isRunning = true;
//一直循环,从waitingEvents队列中取出WatchedEvent或者Packet
while(true) {
Object event = this.waitingEvents.take();
if (event == ClientCnxn.this.eventOfDeath) {
this.wasKilled = true;
} else {
//处理event
this.processEvent(event);
}

if (this.wasKilled) {
synchronized(this.waitingEvents) {
if (this.waitingEvents.isEmpty()) {
this.isRunning = false;
break;
}
}
}
}
} catch (InterruptedException var5) {
ClientCnxn.LOG.error("Event thread exiting due to interruption", var5);
}

ClientCnxn.LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(ClientCnxn.this.getSessionId()));
}

private void processEvent(Object event) {
try {
//watcher监听事件
if (event instanceof ClientCnxn.WatcherSetEventPair) {
// each watcher will process the event
ClientCnxn.WatcherSetEventPair pair = (ClientCnxn.WatcherSetEventPair)event;
//循环watcher集合,调用watcher的回调方法(WatchedEvent event)处理该事件

while(var3.hasNext()) {
Watcher watcher = (Watcher)var3.next();

try {
//还记得Watcher接口中对于watcher实现类都要实现的一个process(WatchedEvent event)方法,就是这个
watcher.process(pair.event);
} catch (Throwable var11) {
ClientCnxn.LOG.error("Error while calling watcher ", var11);
}
}
}
//异步回调
else {
ClientCnxn.Packet p = (ClientCnxn.Packet)event;
//ResultCode服务器端响应码,常见码如下
//0:接口调用成功 -4客户端与服务器端连接已断开
//-110指定节点已存在 -112会话已过期
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}

if (p.cb == null) {
ClientCnxn.LOG.warn("Somehow a null cb got to EventThread!");
}
//如果是exists/setData/setAcl命令异步方式的服务器端响应
else if (!(p.response instanceof ExistsResponse) && !(p.response instanceof SetDataResponse) && !(p.response instanceof SetACLResponse)) {


//getData命令异步方式的服务器异步响应
if (p.response instanceof GetDataResponse) {
DataCallback cbxxxx = (DataCallback)p.cb;
GetDataResponse rspxxx = (GetDataResponse)p.response;
//rc==0表示接口调用成功
if (rc == 0) {
cbxxxx.processResult(rc, clientPath, p.ctx, rspxxx.getData(), rspxxx.getStat());
} else {
cbxxxx.processResult(rc, clientPath, p.ctx, (byte[])null, (Stat)null);
}
}
//getACL命令异步方式的服务器异步响应
else if (p.response instanceof GetACLResponse) {
ACLCallback cbxxxxx = (ACLCallback)p.cb;
GetACLResponse rspxxxx = (GetACLResponse)p.response;
if (rc == 0) {
cbxxxxx.processResult(rc, clientPath, p.ctx, rspxxxx.getAcl(), rspxxxx.getStat());
} else {
cbxxxxx.processResult(rc, clientPath, p.ctx, (List)null, (Stat)null);
}
}
//getChildren命令的服务器响应
else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cbxxxxxx = (ChildrenCallback)p.cb;
GetChildrenResponse rspxxxxx = (GetChildrenResponse)p.response;
if (rc == 0) {
cbxxxxxx.processResult(rc, clientPath, p.ctx, rspxxxxx.getChildren());
} else {
cbxxxxxx.processResult(rc, clientPath, p.ctx, (List)null);
}
}
//getChildren命令异步方式的服务器异步响应,与上面的区别在与stat信息更新
else if (p.response instanceof GetChildren2Response) {
Children2Callback cbxxxxxxx = (Children2Callback)p.cb;
GetChildren2Response rsp = (GetChildren2Response)p.response;
if (rc == 0) {
cbxxxxxxx.processResult(rc, clientPath, p.ctx, rsp.getChildren(), rsp.getStat());
} else {
cbxxxxxxx.processResult(rc, clientPath, p.ctx, (List)null, (Stat)null);
}
}
//create命令异步方式的服务器异步响应
else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback)p.cb;
CreateResponse rspx = (CreateResponse)p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, ClientCnxn.this.chrootPath == null ? rspx.getPath() : rspx.getPath().substring(ClientCnxn.this.chrootPath.length()));
} else {
cb.processResult(rc, clientPath, p.ctx, (String)null);
}
}
//MultiResponse
else if (p.response instanceof MultiResponse) {
MultiCallback cbx = (MultiCallback)p.cb;
MultiResponse rspxx = (MultiResponse)p.response;
if (rc == 0) {
List<OpResult> results = rspxx.getResultList();
int newRc = rc;
Iterator var9 = results.iterator();

while(var9.hasNext()) {
OpResult result = (OpResult)var9.next();
if (result instanceof ErrorResult && Code.OK.intValue() != (newRc = ((ErrorResult)result).getErr())) {
break;
}
}

cbx.processResult(newRc, clientPath, p.ctx, results);
} else {
cbx.processResult(rc, clientPath, p.ctx, (List)null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cbxx = (VoidCallback)p.cb;
cbxx.processResult(rc, clientPath, p.ctx);
}
} else {
StatCallback cbxxx = (StatCallback)p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cbxxx.processResult(rc, clientPath, p.ctx, ((ExistsResponse)p.response).getStat());
} else if (p.response instanceof SetDataResponse) {
cbxxx.processResult(rc, clientPath, p.ctx, ((SetDataResponse)p.response).getStat());
} else if (p.response instanceof SetACLResponse) {
cbxxx.processResult(rc, clientPath, p.ctx, ((SetACLResponse)p.response).getStat());
}
} else {
cbxxx.processResult(rc, clientPath, p.ctx, (Stat)null);
}
}
}
} catch (Throwable var12) {
ClientCnxn.LOG.error("Caught unexpected throwable", var12);
}

}
}

参考:

🔗 《从Paxos到Zookeeper-分布式一致性原理与实践》