ZooKeeper(三)开源客户端ZkClient和Curator

ZooKeeper(三)开源客户端

一. ZkClient

ZkClient是GitHub上的一个开源的ZooKeeper客户端,它在ZK原生API接口上进行了包装,同时内部实现了诸如Session超时重连、Watcher反复注册等功能。

1.1 引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</dependency>
</dependencies>

1.2 创建会话

构造函数API:

1
2
3
4
5
6
7
public ZkClient(String serverstring);
public ZkClient(String zkServers, int connectionTimeout);
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout);
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer);
public ZkClient(IZkConnection connection);
public ZkClient(IZkConnection connection, int connectionTimeout);
public ZkClient(IZkConnection connection, int connectionTimeout, ZkSerializer zkSerializer);

参数:

参数名 说明
zkServers 指ZooKeeper服务器列表,由英文状态逗号分开的host:port字符串组成,每一个都代表一台ZooKeeper机器,例如,192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
sessionTimeout 会话超时时间,单位为毫秒,默认是30000ms
connectionTimeout 连接创建超时时间,单位为毫秒。此参数表明如果在这个时间段内还是无法和ZooKeeper建立连接,就抛出异常
connection IZkConnection接口的实现类
zkSerializer 自定义序列化器
  • ZooKeeper会话的建立是一个异步的过程,开发人员需要自己来进行等待处理,ZkClient通过内部包装,将这个异步的会话创建过程同步化了,方便开发者使用。
  • IZkConnection接口是对ZooKeeper原生接口最直接的包装和交互层,包含了增删改查一系列接口的定义,有两种默认实现 ZkConnectionInMemoryConnection
  • ZkClient中定义了ZkSerializer接口允许用户传入一个序列化实现,如Hessian或Kryo,默认情况下会使用Java自带的序列化方式。
  • ZkClient不再需要提供Watcher参数,其引入了Listener来实现Watcher的注册。
1
2
3
4
5
6
7
// 创建ZK客户端
public class Create_Session_Sample {
public static void main(String[] args) throws IOException, InterruptedException {
ZkClient zkClient = new ZkClient("domain1.book.zookeeper:2181", 5000);
System.out.println("ZooKeeper session established.")
}
}

1.3 创建节点

createEphemeral 接口是创建临时节点,createPersistentSequential 则是创建持久顺序节点。

ZK原生API无法递归创建节点,只能在父节点存在时创建子节点,所以每次都要检查父节点是否存在。ZkClient通过 createParents 参数在内部帮助我们递归建立父节点。

1.4 删除节点

deleteRecursive 自动完成逐层遍历删除节点的操作。

1.5 读取数据

1
2
3
4
5
6
7
List<String> getChildren(String path);
// 注册事件监听
List<String> subscribeChildChanges(String path, IZkChildListener listener);

public interface IZkChildListener {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
}

IZkChildListener 参数:

参数名 说明
parentPath 子节点变更通知对应的父节点的节点路径
currentChilds 子节点的相对路径列表,如果没有子节点,会传入null

节点注册监听后会收到如下事件通知:

  • 客户端可以对一个不存在的节点进行子节点变更的监听。
  • 一旦客户端对一个节点注册子节点列表变更监听之后,当该节点的子节点列表发送变更时,服务端都会通知并将最新的子节点列表发送给客户端。
  • 该节点本身的创建或删除也会通知到客户端。
  • ZkClient的Listener不同于Watcher,一旦注册就会一直生效。
1
2
3
<T extends Object> T readData(String path);
<T extends Object> T readData(String path, boolean returnNullIfPathNotExists);
<T extends Object> T readData(String path, Stat stat);

1
2
3
4
5
// 同样通过注册Listener来实现监听
public interface IZkDataListener {
public void handleChildChange(String dataPath, Object data) throws Exception;
public void handleChildDeleted(String dataPath) throws Exception;
}
参数名 说明
dataPath 事件通知对应的节点路径
data 最新的数据内容

1.6 更新数据

1
2
void writeData(String path, Object data);
void writeData(final String path, Object data, final int expectedVersion);

检测指定节点是否存在:

1
boolean exists(final String path);

二. Curator

Curator是由Netflix开发的一套ZooKeeper客户端框架,和ZkClient一样解决了非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等。Curator在ZK原生API的基础上进行了包装,提供了一套易用性和可读性更强的Fluent风格的API框架。除此之外,Curator还提供了ZooKeeper各种应用场景(Recipe,如共享锁服务、Master选举机制和分布式计数器)的抽象封装。

2.1 引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>

2.2 创建会话

  1. 使用 CuratorFrameworkFactory 工厂类的两个静态方法来创建一个客户端

    1
    2
    3
    static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy);

    static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);
  2. 通过调用 CuratorFramework 中的 start() 方法来启动会话。

Curator通过接口 RetryPolicy 来让用户自定义重试策略:

1
boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);

使用Curator创建一个ZK客户端会话:

ExponentialBackoffRetry 是默认重试策略之一:

1
2
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries);
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs);

给定一个初始sleep时间baseSleepTimeMs,在此基础上结合重试次数,通过以下公式计算出当前需要sleep的时间:

1
当前sleep时间 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))

使用Fluent风格的API接口创建会话:

使用Curator创建含隔离命名空间的会话:

1
2
3
4
5
6
7
//实现不同ZK业务之间的隔离,需要给每个业务分配一个独立的命名空间,即指定一个ZK根路径:
CuratorFrameworkFactory.builder()
.connectString("domain.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();

2.3 创建节点

1
2
3
4
5
6
7
8
9
10
11
12
// CuratorFramework
public CreateBuilder create();

// CreateBuilder
public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded();

// CreateModable
public T withMode(CreateMode mode);

// PathAndBytesable<T>
public T forPath(String path, byte[] data) throws Exception;
public T forPath(String path) throws Exception;
  • 创建一个节点,初始内容为空

    1
    client.create().forPath(path);
  • 创建一个节点,附带初始内容

    1
    client.create().forPath(path, "init".getBytes());
  • 创建一个临时节点,初始内容为空

    1
    client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
  • 创建一个临时节点,并自动递归创建父节点

    1
    client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);

示例:

2.4 删除节点

1
2
3
4
5
6
7
8
9
10
// CuratorFramework
public DeleteBuilder delete();
// Versionable<T>
public T withVersion(int version);
// DeleteBuilder
// 客户端执行一个删除节点的操作,可能由于网络原因导致删除失败,在某些场景中这种异常是致命的,比如Master选举。当调用guaranteed(),会记录下这次失败的删除操作,只要客户端会话有效,就会在后台反复重试直到节点删除成功
public DeleteBuilderBase guaranteed();
// PathAndBytesable<T>
public T forPath(String path, byte[] data) throws Exception;
public T forPath(String path) throws Exception;
  • 删除一个节点:

    1
    2
    // 此接口只能删除叶子节点
    client.delete().forPath(path);
  • 删除一个节点,并递归删除其所有子节点:

    1
    client.delete().deletingChildrenIfNeeded().forPath(path);
  • 删除一个节点,强制指定版本进行删除:

    1
    client.delete().withVersion(version).forPath(path);
  • 删除一个节点,强制保证删除:

    1
    client.delete().guaranteed().forPath(path);

示例:

2.5 读取数据

1
2
3
4
5
6
// CuratorFramework
public GetDataBuilder getData();
// Statable<T>
public T storingStatIn(Stat stat);
// Pathable<T>
public T forPath(String path) throws Exception;
  • 读取一个节点的数据内容:

    1
    2
    // 返回值为byte[]
    client.getData().forPath(path);
  • 读取一个节点的数据内容,同时获取到该节点的stat:

    1
    2
    // Curator通过传入一个旧的stat变量的方式来存储服务端返回的最新的节点状态信息
    client.getData().storingStatIn(stat).forPath(path);

示例:

2.6 更新数据

1
2
3
4
5
6
7
// CuratorFramework
public SetDataBuilder setData();
// Versionable<T>
public T withVersion(int version);
// PathAndBytesable<T>
public T forPath(String path, byte[] data) throws Exception;
public T forPath(String path) throws Exception;
  • 更新一个节点的数据内容:

    1
    2
    // 返回一个stat对象
    client.setData().forPath(path);
  • 更新一个节点的数据内容,强制指定版本进行更新:

    1
    2
    // withVersion用来实现CAS,version通常由一个旧的stat对象获取到
    client.setData().withVersion(version).forPath(path);

示例:第一次使用最新的stat变量进行更新操作,第二次则使用了过期的stat变量。

2.7 异步接口

以上API皆为Curator的同步接口,Curator引入了 BackgroundCallback 接口,用来处理异步接口调用之后服务端返回的结果信息:

1
2
3
public interface BackgroundCallback {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
参数名 说明
client 当前客户端实例
event 服务端事件,CuratorEvent定义了ZK服务端发送到客户端的一系列事件参数

CuratorEvent中比较重要的两个参数:

  • 事件类型(CuratorEventType):getType() 代表本次事件的类型
    • CREATE:CuratorFramework#create()
    • DELETE:CuratorFramework#delete()
    • EXISXTS:CuratorFramework#checkExists()
    • GET_DATA:CuratorFramework#getData()
    • SET_DATA:CuratorFramework#setData()
    • CHILDREN:CuratorFramework#getChildren()
    • SYNC:CuratorFramework#sync()
    • GET_ACL:CuratorFramework#getACL()
    • WATCHED:Watchable#usingWatcher(Watcher) / Watchable#watched()
    • CLOSING:ZK客户端与服务端连接断开事件
  • 响应码(int):定义在org.apache.zookeeper.KeeperException.Code 中,常见的有:
    • 0(OK)接口调用成功
    • -4(ConnectionLoss)客户端与服务端连接断开
    • -110(NodeExists)指定节点已存在
    • -112(SessionExpired)会话已过期
1
2
3
4
5
6
7
// Backgroudable<T>
public T inBackground();
public T inBackground(Object context);
public T inBackground(BackgroundCallback callback);
public T inBackground(BackgroundCallback callback, Object context);
public T inBackground(BackgroundCallback callback, Executor executor);
public T inBackground(BackgroundCallback callback, Object context, Executor executor);

在ZooKeeper中,所有异步通知事件处理都由EventThread线程来处理,其串行处理机制在绝大部分应用场景下能够保证对事件处理的顺序性,但也有弊端就是一旦碰到一个复杂的处理单元,就会消耗过长的处理时间,从而影响到对其他事件的处理。因此接口会让用户传入一个Executor实例,把复杂的事件处理放到一个专门的线程池中,如 Executors.newFixedThreadPool(2)

示例:

2.8 典型使用场景

参考案例都在recipes包中,需要单独引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>

(1)事件监听

Curator引入了Cache来实现对ZK服务端事件的监听,Cache可以看作是一个本地缓存视图和远程ZooKeeper视图的对比过程,同时自动处理反复监听。Cache分为两种:节点监听(NodeCache)和子节点监听(PathChildrenCache)。

1
2
3
4
5
6
7
8
// NodeCache用于监听指定ZK数据节点本身的变化
public NodeCache(CuratorFramework client, String path);
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

// NodeCache定义了事件处理的回调接口NodeCacheListener
public interface NodeCacheListener {
public voidnodeChanged() throw Exception;
}
参数名 说明
client Curator客户端实例
path 数据节点的节点路径
dataIsCompressed 是否进行数据压缩

示例:

start() 有个布尔类型参数,默认为false,当设置为true时NodeCache首次启动会立即从ZK上读取对应节点的数据内容,并保存在Cache中。NodeCache不仅可以用于监听数据节点的内容变更,也能监听节点是否存在。若原本节点不存在,Cache会在节点创建后触发NodeCacheListener。

PathChildrenCache 用于监听指定ZK数据节点的子节点变化情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory);

public PathChildrenCache(CuratorFramework client, String path, boolean dataIsCompressed, ThreadFactory threadFactory);

public PathChildrenCache(CuratorFramework client, String path, boolean dataIsCompressed, final ExecutorService executorService);

public PathChildrenCache(CuratorFramework client, String path, boolean dataIsCompressed, final CloseableExecutorService executorService);

// 事件处理的回调接口,子节点发生变化时回调方法,事件类型包括:新增子节点、数据变更、子节点删除三类
public interface PathChildrenCacheListener {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}

示例:

(2)Master选举

分布式系统常见场景:一个复杂的任务,仅需从集群中选举出一台仅需处理即可。

大致思路:选择一个根节点,如 /master_select ,多台机器同时向该节点创建一个子节点 /master_select/lock 最终只有一台机器能创建成功,就相当于被选为Master。

监听器 LeaderSelectorListenerAdapter 需要自行实现,Curator会在成功获取Master权利后回调此监听器:执行完方法会立即释放权利,重新开始下一轮选举

(3)分布式锁

为了保证分布式环境的数据一致,需要在程序某个运行节点进行同步控制,比如用时间戳来生成流水号,以下示例展示这种典型的并发问题:

因为没有同步导致了上述的数据重复问题,可以使用Curator来实现分布式锁:

(4)分布式计数器

统计系统的在线人数,我们可以指定一个ZK数据节点作为计数器,多个应用实例在分布式锁的控制下,通过更新该数据节点来实现计数(可以直接使用封装的 DistributedAtomicInteger)。

(5)分布式Barrier

Barrier是一种用来控制多线程之间同步的经典方法,JDK自带CyclicBarrier实现:

多线程在并发情况下,都会准确的等待所有先都处于就绪状态后才开始同时执行其他业务逻辑,但分布式系统有多个JVM,此时可以使用Curator提供的DistributedBarrier:

上述为主线程触发Barrier释放不同,Curator还提供了另一种线程自发的触发Barrier释放:

(6)Curator提供的工具

较常用的有 ZKPaths 和 EnsurePath,前者可以来构建ZNode路径、递归创建和删除节点等:

后者提供了一种能确保数据节点存在的机制,有些场景我们要做某些操作必须先要确保节点存在,若不存在就要先创建节点,但分布式环境中可能有其他机器也在尝试创建该节点,就导致我们创建时得到“节点已存在”的异常。

EnsurePath采取了静默的节点创建方式:

(7)单元测试 TestingServer

引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.4.2</version>
</dependency>

示例:

(8)集群测试 TestingCluster

使用示例:


参考:

🔗 《从Paxos到Zookeeper-分布式一致性原理与实践》