ZooKeeper(五)技术内幕-服务器启动流程和Leader选举

ZooKeeper(五)技术内幕

五. 服务器启动

ZooKeeper服务端架构:

5.1 单机版服务器启动

ZK服务器启动步骤:

  1. 配置文件解析。
  2. 初始化数据管理器。
  3. 初始化网络I/O管理器。
  4. 数据恢复和对外服务。

单机模式下ZK服务器的启动流程图:

(1)预启动

  1. 统一由QuorumPeerMain作为启动类。

    单机/集群模式,在 zkServer.cmdzkServer.sh 两个脚本中都配置了 org.apache.zookeeper.server.quorum.QuorumPeerMain 作为启动入口类。

  2. 解析配置文件 zoo.cfg

    该文件配置了ZK运行时的基本参数,包括tickTime、dataDir和clientPort等参数。

  3. 创建并启动历史文件清理器 DatadirCleanupManager 。

    ZK从3.4.0版本后增加了自动清理历史数据文件的机制,包括对事务日志和快照数据文件进行定时清理。

  4. 判断当前是集群模式还是单机模式的启动。

    ZooKeeper根据步骤2解析出的集群服务器地址列表判断当前是单机还是集群模式。单机模式委托给ZooKeeperServerMain进行启动处理。

  5. 再次进行配置文件 zoo.cfg 的解析。

  6. 创建服务器实例 ZooKeeperServer 。

    org.apache.zookeeper.server.ZooKeeperServer 是单机版服务端最核心的实体类。ZK服务器会首先进行服务器实例的创建,然后对实例进行初始化,包括连接器、内存数据库和请求处理器等组件的初始化。

(2)初始化

  1. 创建服务器统计器 ServerStats 。

    ServerStats 是ZK服务器运行时的统计器:

  2. 创建ZooKeeper数据管理器 FileTxnSnapLog 。

    FileTxnSnapLog 是ZK上层服务器和底层数据存储之间的对接层,提供了一系列操作数据文件的接口,包括事务日志文件和快照数据文件。ZK根据 zoo.cfg 文件中解析出的快照数据目录 dataDir 和事务日志目录 dataLogDir 来创建 FileTxnSnapLog 。

  3. 设置服务器tickTime和会话超时时间限制。

  4. 创建ServerCnxnFactory。

    3.4.0版本引入Netty代替自实现的NIO框架,通过配置系统属性 zookeeper.serverCnxnFactory 来选择所使用的服务端网络连接工厂。

  5. 初始化ServerCnxnFactory。

    首先初始化一个Thread来作为ServerCnxnFactory的主线程,然后再初始化NIO服务器。

  6. 启动ServerCnxnFactory主线程。

    启动步骤5已经初始化的主线程ServerCnxnFactory的run方法,虽然此时NIO服务器已对外开放端口,客户端能够访问2181,但服务器实际无法处理客户端请求。

  7. 恢复本地数据。

    每次在ZK启动的时候,都需要从本地快照数据文件和事务日志文件中进行数据恢复。

  8. 创建并启动会话管理器。

    会话管理器 SessionTracker 负责服务端的会话管理,创建SessionTracker的时候,会初始化 expirationInterval、nextExpirationTime 和 sessionWithTimeout(用于保存每个会话的超时时间),同时还会计算出一个初始化的 sessionID 。

    SessionTracker初始化完毕,ZK就会立即开始会话管理器的会话超时检查。

  9. 初始化ZooKeeper的请求处理链。

    ZK的请求处理方式是典型的责任链模式的实现,在服务器上会有多个请求处理器一次来处理一个客户端请求。服务器启动时将这些请求处理器串联起来形成一个请求处理链。单机版包括 PrepRequestProcessor、SyncRequestProcessor 和 FinalRequestProcessor 三个请求处理器

  10. 注册JMX服务。

    ZK将服务器运行时的一些信息以JMX的方式暴露给外部。

  11. 注册ZooKeeper服务器实例。

    此前端口开放但不能处理客户端请求,因为网络层尚未能访问ZK实例。经过后续步骤后,ZK服务器已初始化完毕,只需注册给ServerCnxnFactory就可以对外提供正常的服务。

5.2 集群版服务器启动

集群模式下ZK服务器的启动流程图:

(1)预启动

  1. 统一由QuorumPeerMain作为启动类。

  2. 解析配置文件 zoo.cfg

  3. 创建并启动历史文件清理器 DatadirCleanupManager 。

  4. 判断当前是集群模式还是单机模式的启动。

    ZooKeeper根据步骤2解析出的集群服务器地址列表判断当前是单机还是集群模式。

(2)初始化

  1. 创建ServerCnxnFactory。

  2. 初始化ServerCnxnFactory。

  3. 创建ZooKeeper数据管理器 FileTxnSnapLog 。

  4. 创建QuorumPeer实例。

    QuorumPeer是集群模式特有的对象,是ZK服务器实例的托管者,从集群层面看,QuorumPeer代表了ZK集群中一台机器。运行期间,QuorumPeer 会不断检测当前服务器实例的运行状态,同时根据情况发起Leader选举。

  5. 创建内存数据库ZKDatabase。

    负责管理ZooKeeper的所有会话记录以及DataTree和事务日志的存储。

  6. 初始化QuorumPeer。

    一些核心组件注册到QuorumPeer中去,包括FileTxnSnapLog、ServerCnxnFactory 和 ZKDatabase 。同时ZK还会对QuorumPeer配置一些参数,包括服务器地址列表、Leader 选举算法和会话超时时间限制。

  7. 恢复本地数据。

  8. 启动ServerCnxnFactory主线程。

(3)Leader选举

步骤:

  1. 初始化Leader选举。

    ZK根据自身的SID(服务器ID)、lastLoggedZxid(最新ZXID)和当前服务器epoch(currentEpoch)来生成一个初始化的投票,每个服务器都投自己。

    然后ZK根据 zoo.cfg 的配置,创建对应的Leader选举算法实现。默认可选三种实现,使用 electionAlg 属性0~3指定:

    • LeaderElection:3.4.0版本废弃。
    • AuthFastLeaderElection:3.4.0版本废弃。
    • FastLeaderElection:当前唯一选择。

    初始化阶段,ZK首先创建Leader选举所需的网络I/O层QuorumCnxManager,同时启动对Leader选举端口的监听,等待集群中其他服务器创建连接。

  2. 注册JMX服务。

  3. 检测当前服务器状态。

    QuorumPeer是ZK服务器实例的托管者,其核心不断的检查当前服务器的状态并做出处理。正常情况下,ZK服务器状态在 LOOKING、LEADING和FOLLOWING/OBSERVING 之间切换。启动阶段,QuorumPeer的初始状态为LOOKING,此时进行Leader选举。

  4. Leader选举。

    一个集群中所有的机器相互之间进行一系列投票,选举最合适的机器成为Leader,其余则是Follower或Observer。集群中哪个机器处理的数据最新(根据服务器处理过的最大ZXID来比较新旧)就越有可能成为Leader。当所有服务器处理的ZXID一致,SID最大的会成为Leader。

(4)Leader和Follow启动期交互过程

Leader和Follower启动期交互过程步骤:

  1. 创建Leader服务器和Follower服务器。

    完成Leader选举后,每个服务器都会根据自己的角色创建对应的服务器实例,开始自己的主流程。

  2. Leader服务器启动Follower接收器LearnerCnxAcceptor。

    ZK集群运行期间,Leader服务器需要和其余服务器(Learner服务器)保持连接来确定其存活情况。LearnerCnxAcceptor负责接收所有非Leader服务器的连接请求。

  3. Learner服务器开始和Leader建立连接。

    所有Learner服务器启动完毕后,从投票结果得到Leader服务器信息,并与其建立连接。

  4. Leader服务器创建LearnerHandler。

    Leader接收到连接创建请求后,创建一个对应的LearnerHandler实例表示二者间的连接,并负责所有的消息通信和数据同步。

  5. 向Leader注册。

    建立好连接后,Learner向Leader注册,发送自己的LearnerInfo,包括服务器SID和处理的最新ZXID。

  6. Leader解析Learner信息,计算新的epoch。

    Leader根据解析出的SID和ZXID,获取对应的 epoch_of_learner ,与当前服务器的 epoch_of_leader 进行比较,如果Learner更大的话就更新Leader的epoch:epoch_of_leader = epoch_of_learner + 1

    然后LearnerHandler会进行等待,等到过半的Learner已经向Leader进行了注册,同时更新了 epoch_of_leader 之后,Leader就可以确定当前集群的epoch了。

  7. 发送Leader状态。

    计算出新的epoch后,Leader会将其以一个LEADERINFO消息的形式发送给Learner,同时等待其响应。

  8. Learner发送ACK信息。

    Follower收到LEADERINFO消息后,解析出epoch和ZXID,并向Leader反馈一个ACKEPOCH响应。

  9. 数据同步。

    Leader服务器接收到ACK后,开始与其进行数据同步。

  10. 启动Leader和Learner服务器。

    当过半的Learner已经完成了数据同步,Leader和Learner服务器的实例可以开始启动了。

Leader和Follower启动的步骤:

  1. 创建并启动会话管理器。
  2. 初始化ZK的请求处理链。
  3. 注册JMX服务。

六. Leader选举

6.1 概述

Leader选举是保证分布式数据一致性的关键所在。

(1)服务器启动时期的Leader选举

假设集群有三台机器,集群初始化阶段只有一台机器启动时还不能进行Leader选举,只有当第二台机器启动并能互相通信时,才能进入选举流程:

  1. 每个Server会发出一个投票。

    每台机器都投自己,每次投票包含最基本元素:所推举的服务器的myid和ZXID。

    所以结果是两个投票(1, 0)和(2, 0)分别发送给集群的其他机器。

  2. 接收来自各个服务器的投票。

    每个服务器收到投票后,先判断投票的有效性,包括检查是否为本轮投票、是否来自LOOKING状态的服务器。

  3. 处理投票。

    每一个投票,服务器都要将其与自己的投票进行PK,规则:

    • 优先检查ZXID,较大的服务器优先作为Leader。
    • ZXID相同时比较myid,较大的服务器优先作为Leader。

    所以步骤1的两个投票中后者myid较大,服务器1将自己的投票更新为(2, 0),服务器2则重复再发一次投票信息。

  4. 统计投票。

    每次投票后,服务器都会统计所有投票,判断是否已有过半的机器收到相同的投票信息。

  5. 改变服务器状态。

    一旦确定了Leader,每个服务器都会更新自己的状态,Follower更新为FOLLOWING,Leader更新为LEADING。

(2)服务器运行期间的Leader选举

当Leader所在服务器挂掉,会进入新一轮的Leader选举:

  1. 变更状态。

    Leader挂了,所有非Observer服务器将自己状态更新为LOOKING。

  2. 每个Server会发出一个投票。

    生成投票信息(myid, ZXID),此时服务器1和3产生投票(1, 123)和(3, 122),发送给各个机器。

  3. 接收来自各个服务器的投票。

  4. 处理投票。

    和启动阶段规则一致,服务器1会被选为Leader。

  5. 统计投票。

  6. 改变服务器状态。

6.2 算法分析

ZK提供了三种选举算法,在 zoo.cfg 的配置中使用 electionAlg 属性指定:

  • LeaderElection:数字0,纯UDP实现。
  • AuthFastLeaderElection:数字2,UDP版本,授权模式。
  • FastLeaderElection:数字1表示UDP版本,非授权模式;数字3表示,TCP版本,当前唯一选择。

(1)术语

  • SID(服务器ID):唯一标识一台机器,与myid的值一致。
  • ZXID(事务ID):唯一标识一次服务器状态的变更,集群中每台机器的不一定一致。
  • Vote(投票):当机器发现自己无法检测到Leader时,就会尝试开始Leader选举。
  • Quorum(过半机器数):集群中机器数为n,则 quorum = (n / 2 + 1)

(2)何时进入Leader选举

当某台机器出现任一情况:

  • 服务器初始化启动。
  • 服务器运行期间无法和Leader保持连接。

当一台机器进入选举流程时,集群可能处于两种状态之一:

  • 集群本身存在一个Leader。

    可能因为某台机器启动较晚,此时选举流程已结束,所以当其尝试选举Leader时会被告知当前Leader服务器的信息。机器仅需和Leader建立连接并同步状态即可。

  • 集群确实已不存在Leader。

(3)开始第一次投票

导致集群中不存在Leader:

  • 服务器刚初始化启动,尚未产生一台Leader服务器。
  • 运行期间Leader服务器挂掉。

此时集群所有机器都处于LOOKING状态,尝试寻找Leader。当一台机器处于此状态,会向集群其它机器发送投票消息(SID, ZXID)。

首次投票所有机器都会选举自己。

(4)变更投票

每台机器发送完自己的投票,也会接收到其它机器的投票。根据投票规则判断是否需要变更自己的投票。

术语:

  • vote_sid:接收到的投票中的服务器SID。
  • vote_zxid:接收到的投票中的服务器ZXID。
  • self_sid:服务器自己的SID。
  • self_zxid:服务器自己的ZXID。

规则:

  1. 如果vote_zxid大于self_zxid,认可当前收到的投票,再次将其发送出去代表自己新一轮的投票。
  2. 如果vote_zxid小于self_zxid,坚持自己的投票,不做任何变更。
  3. 如果vote_zxid等于self_zxid,对比二者的SID,如果vote_sid大于self_sid,认可当前收到的投票,再次将其发送出去代表自己新一轮的投票。
  4. 如果vote_zxid等于self_zxid,且vote_sid小于self_sid,坚持自己的投票,不做任何变更。

假设集群有5台机器,SID为(1,2,3,4,5),ZXID为(9,9,9,8,8),此时SID为2的机器为Leader,且1和2同时挂掉,由此开始Leader选举:

(5)确定Leader

经过第二次投票后,集群中每台机器都会再次收到其他机器的投票,并进行统计,当一台机器收到了超过半数相同的投票,其就称为新的Leader。

上述案例中quorum为3,只要收到3个即可。

6.3 实现细节

上述FastLeaderElection选举算法并不复杂,但实际实现有很多问题需要解决。

(1)服务器状态

org.apache.zookeeper.server.quorum.QuorumPeer.ServerState 类中列举了4种服务器状态:

  • LOOKING:寻找Leader状态,服务器在此状态时会认为集群没有Leader,需要进入Leader选举流程。
  • FOLLOWING:跟随者状态,表示当前机器为Follower。
  • LEADING:领导者状态,表示当前机器为Leader。
  • OBSERVING:观察者状态,表示当前机器为Observer。

(2)投票的数据结构

org.apache.zookeeper.server.quorum.Vote 类的数据结构:

(3)QuorumCnxManager-网络I/O管理者

ClientCnxn是ZK客户端中用于处理网络I/O的管理器,Leader选举中类似角色就是QuorumCnxManager,负责各台服务器之间的底层Leader选举过程中的网络通信。

其内部维护了一系列的队列,用来保存接收到的、待发送的消息,以及消息的发送器。每个队列都按SID分组形成队列集合,保证每台服务器之间互不干扰。

  • recvQueue:消息接收队列,存放从其他服务器接收到的消息。
  • queueSendMap:消息发送队列,存放待发送的消息。该Map按SID进行分组,为集群中每台机器分配一个单独的队列,保证机器之间的消息互不影响。
  • senderWorkerMap:发送器集合,每个SendWorker消息发送器,都对应一台远程ZooKeeper服务器,负责消息的发送。也按SID进行了分组。
  • lastMessageSent:最近发送过的消息,为每个SID保留最近发送过的一个消息。

ZK中所有机器都要两两创建网络连接,所以QuorumCnxManager启动时会创建一个ServerSocket监听Leader选举的通信端口(默认3888),服务器会不断接收到其它服务器的创建连接请求。这类TCP连接请求会交由receiveConnection方法进行处理

为了避免两条机器重复创建TCP连接,ZK设计了规则:只允许SID大的服务器主动和其他服务器建立连接,否则断开连接。所以receiveConnection方法中会比对自己和远程服务器的SID值,当自己更大时断开连接并主动发送连接请求,否则接受连接请求。

一旦连接建立,就会根据远程服务器的SID创建对应的消息发送器SendWorker和接收器RecvWorker并启动。

  • 消息接收:RecvWorker从对应TCP连接收到消息,将其放到recvQueue中。
  • 消息发送:SendWorker不断从queueSendMap对应消息队列取出一个消息发送,同时将其放入lastMessageSent记录。(注意:当消息发送队列为空时,会从lastMessageSent取出一个最近发送的消息再次发送,主要为了解决分布式问题:接收方在消息接收前或接收到消息后挂掉,导致消息还未被正确处理。当然ZK也保证了接收方对重复消息进行了正确的处理

(4)FastLeaderElection核心

术语:

  • 外部投票:其他服务器发来的投票。
  • 内部投票:服务器自身当前的投票。
  • 选举轮次:Leader选举的轮次,即logicalClock。
  • PK:指对内部和外部投票进行对比,来确认是否需要变更内部投票。

选票管理:

  • sendqueue:选票发送队列,保存待发送的选票。
  • recvqueue:选票接收队列,保存接收到的外部投票。
  • WorkerReceiver:选票接收器,不断从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换为一个选票,并保存到recvqueue。
    • 在选票接收过程中,如果发现其选举轮次小于当前服务器就直接忽略,并立即发送内部投票。
    • 当前服务器不是LOOKING状态,表示已选出Leader,同样直接忽略,并立即发送内部投票。
    • 接收到的消息来自于Observer,同样直接忽略,并立即发送内部投票。
  • WorkerSender:选票发送器,不断从sendqueue获取选票并将其传递给底层QuorumCnxManager。

选票管理与QuorumCnxManager在Leader选举中的关系:

Leader选举算法整体流程:

上述其实是 lookForLeader方法的内部逻辑:

  1. 自增选举轮次。

    FastLeaderElection的实现中有一个属性logicalclock,用于标识当前Leader的选举轮次,开始新一轮投票时会先将其自增。

  2. 初始化选票。

    开始新一轮投票前,每台服务器要先初始化自己的选票,也就是初始化Vote的各个属性。

  3. 发送初始化选票。

    完成选票的初始化后,就会发起第一次投票。选票被放入sendqueue中,由WorkerSender负责发送出去。

  4. 接收外部投票。

    每台服务器会不断从recvqueue中获取外部投票,当服务器发现自己无法获取任何外部投票,会立刻确认自己是否和集群其它服务器保持连接。若没有就立即建立连接,如已有则再次发送自己的内部投票。

  5. 判断选举轮次。

    发送完初始化选票,就要处理外部投票。需要根据选举轮次的不同进行不同的处理:

    • 外部投票的轮次大于内部投票:立即更新自己的选举轮次,并清空所有已经收到的投票,然后使用初始化投票进行PK来确定是否变更内部投票,最终再将投票发送出去。
    • 外部投票的轮次小于内部投票:直接忽略该投票,返回步骤4。
    • 外部投票的轮次等于内部投票:进行选票PK。
  6. 选票PK。

    FastLeaderElection.totalOrderPredicate 方法的核心逻辑。主要从选举轮次、ZXID和SID三个因素考虑:

    • 如果外部投票中被推举的Leader服务器的选举轮次大于内部投票,需要进行投票变更。
    • 选举轮次一致,则对比ZXID,如果外部投票较大就需要进行投票变更。
    • ZXID一致,则对比SID,如果外部投票较大就需要进行投票变更。
  7. 变更投票。

    选票PK后确定如果需要进行投票变更,要将外部投票的选票信息覆盖内部投票。变更完成后再将其发送出去。

  8. 选票归档。

    无论是否进行了投票变更,都要将收到的外部投票放入recvset中进行归档。其用于记录当前服务器在本轮次收到的所有外部投票,并会按SID进行区分,如 {(1, vote1),(2, vote2),...}

  9. 统计投票。

    完成选票归档后,可以开始投票的统计。一旦统计出有过半服务器认可了当前内部投票,就终止投票,否则返回步骤4。

  10. 更新服务器状态。

    当统计投票确定可以终止投票,开始更新服务器状态。如果判断过半服务器认可,就会更新为LEADING。否则根据具体情况更新为FOLLOWING或OBSERVING。步骤9到步骤10可能会有一段时间延迟来确认是否有新的更优的投票(默认200毫秒)。

七. 各服务器角色介绍

7.1 Leader

Leader的主要工作:

  • 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
  • 集群内部各服务器的调度者。

ZK使用责任链模式处理每个客户端请求,请求处理链:

事务请求:指会改变服务器状态的一系列请求,如创建节点、更新数据、删除节点以及创建会话等。

  • PrepRequestProcessor:请求预处理器,是第一个请求处理器。其能识别出当前客户端请求是否为事务请求。如果是则进行一系列预处理,如创建请求事务头、事务体、会话检查、ACL检查和版本检查等。

  • ProposalRequestProcessor:事务投票处理器,Leader服务器事务处理流程的发起者。

    • 其会将非事务请求流转到CommitProcessor,不再做其他处理;
    • 事务请求除了交给CommitProcessor外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follower服务器从而发起一次集群内事务投票。
    • 同时还会将事务请求交付给SyncRequestProcessor进行事务日志的记录。
  • SyncRequestProcessor:事务日志记录处理器,主要用来将事务请求记录到事务日志文件中,同时触发ZK进行数据快照。

  • AckRequestProcessor:负责在SyncRequestProcessor处理器完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。

  • CommitProcessor: 事务提交处理器,使服务器都可以控制对事务请求的顺序处理。

    • 非事务请求,该处理器会直接将其交付给下一级处理器;
    • 事务请求,其会等待集群内针对Proposal的投票直到其可被提交。
  • ToBeCommitProcessor:特殊的处理器,有一个toBeApplied队列,专门用来存储已被CommitProcessor处理过的可被提交的Proposal。

    将这些请求逐个交付给FinalRequestProcessor进行处理,待其处理完毕后再从toBeApplied中移除。

  • FinalRequestProcessor:最后一个请求处理器,主要用来进行客户端请求返回之前的收尾工作,包括创建客户端请求的响应;针对事务请求,该处理器还会负责将事务应用到内存数据库中。

Leader服务器会与所有其他服务器创建一个TCP长连接,同时为每个服务器创建一个名为LearnerHandler的实体,是ZK集群中Learner服务器的管理器,负责Learner服务器与Leader服务器之间的一系列网络通信,包括数据同步、请求转发和Proposal提议的投票等。

7.2 Follower

主要工作:

  • 处理客户端非事务请求,转发事务请求给Leader服务器。
  • 参与事务请求Proposal的投票。
  • 参与Leader选举投票。

Follower也采用责任链模式组装的请求处理链来处理每一个客户端请求,但不需要负责事务请求的投票处理。

与Leader服务器有差异的两个处理器:

  • FollowerRequestProcessor:第一个请求处理器,主要工作是识别当前请求是否是事务请求。事务请求会将其转发给Leader服务器。
  • SendAckRequestProcessor:同样负责事务日志记录反馈的角色,完成日志记录后,会向Leader服务器发送ACK消息表示自己完成了事务日志的记录工作。与AckRequestProcessor的唯一区别是,后者与Leader服务器在同台服务器,因此其ACK反馈仅仅是一个本地操作;前者位于Follower服务器上,通过ACK消息的形式来向Leader服务器进行反馈。

7.3 Observer

主要工作:

  • 观察ZK集群的最新状态变化并将状态变更同步过来。
  • 与Follower一样,处理非事务请求,事务请求则转交给Leader服务器。但Observer不参与任何投票,包括事务请求Proposal的投票和Leader选举投票。
  • 只提供非事务服务,通常用于不影响集群事务处理能力的前提下提升集群的非事务处理能力。

虽然组装了SyncRequestProcessor处理器,但Leader服务器不会将事务请求的投票转发给Observer。

7.4 集群间消息通信

ZK的消息类型可以分为四类:

  • 数据同步型
  • 服务器初始化型
  • 请求处理型
  • 会话管理型

(1)数据同步型

数据同步型消息是指在Learner和Leader服务器进行数据同步时,网络通信所用到的消息。

(2)服务器初始化型

服务器初始化型消息是指整个集群或是某些新机器初始化的时候,Learner和Leader服务器之间相互通信所使用的消息类型。

(3)请求处理型

请求处理型消息是在请求处理的过程中,Learner和Leader服务器之间相互通信所使用的消息类型。

(4)会话管理型

会话管理型消息是ZK进行会话管理过程中,与Learner服务器之间进行通信使用的消息。


参考:

🔗 《从Paxos到Zookeeper-分布式一致性原理与实践》