ZooKeeper(四)应用场景与实现

ZooKeeper(四)应用场景与实现

一. 典型应用场景

ZooKeeper是一个高可用的基于发布/订阅模式的分布式数据管理和协调框架,基于ZAB算法,ZK可以很好的保证分布式环境的数据一致性。

1.1 数据发布/订阅

数据发布/订阅(配置中心):发布者将数据发布到ZK的一个或一系列节点上,订阅者进行订阅,从而动态的获取数据,实现配置信息的集中式管理和数据的动态更新

发布/订阅系统一般都包含两种设计模式:推和拉模式。推模式下服务端主动将数据更新发送给所有订阅的客户端,拉模式下则是由客户端主动发起请求获取最新数据(轮询)。ZK将推拉结合,客户端向服务端注册自己感兴趣的节点,当节点数据变更,服务端发送Watcher事件通知,客户端收到消息后主动获取最新数据

案例:系统中需要一些通用的配置信息,如机器列表信息、运行时的开关配置、数据库配置信息等,这些全局的配置信息通常有3个特性:

  • 数据量较小
  • 数据内容会在运行时发生动态变化
  • 集群中各机器共享,配置一致

单机系统通常会采用本地配置文件或内存变量(JMX来实现对系统运行时内存变量的更新)的方式,但对于分布式系统来说这两张方式比较难管理。

  • 配置存储:

    首先将初始化配置存放在ZK上,选取一个数据节点,如下:

    写入配置信息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    #DBCP
    dbcp.driverClassName=com.mysql.jdbc.Driver
    dbcp.dbJDBCUrl=jdbc:mysql://1.1.1.1:3306/taokeeper
    dbcp.characterEncoding=GBK
    dbcp.username=xiaoming
    dbcp.password=123456
    dbcp.maxActive=30
    dbcp.maxIdle=10
    dbcp.maxWait=10000
  • 配置获取:每个客户端启动时从ZK节点上读取配置信息,并且在该配置节点上注册一个数据变更的Watcher监听,一旦数据发生变更,所有订阅的客户端都能收到通知。

  • 配置变更:运行时可能要切换数据库,此时只需对ZK节点上的配置信息进行更改。

1.2 负载均衡

负载均衡即对多台机器、网络连接、CPU、磁盘驱动或其他资源进行分配负载,以达到优化资源使用、最大化吞吐率、最小化响应时间和避免过载的目的。

分布式系统具有对等性,为了保证系统的高可用性,通常采用副本的方式来对数据和服务进行部署,对于消费者而言需要在这些对等的服务提供方选择一个来执行业务逻辑。

典型的负载均衡如DNS,即域名系统(Domain Name System),可以看作是一个超大规模的分布式映射表,用于将域名和IP地址进行一一映射,方便用户使用域名来访问互联网站点。

我们只能注册有限的域名,当系统规模变大,很难通过统一的DNS配置来管理。可以使用本地HOST绑定来实现域名解析的工作,解决了域名紧张的问题,可以随时修改域名和IP的映射,但机器规模扩大时,系统上线前要在每台机器上绑定域名会非常不方便,如果需要临时更新域名还需要到每台机器上变更。

(1)动态DNS服务

  • 配置管理:

    首先要在ZK上创建一个节点进行域名配置:

    每个应用都可以创建一个属于自己的数据节点作为域名配置的根节点,配置清单:

    1
    2
    3
    4
    #单个IP:PORT
    192.168.0.1:8080
    #多个IP:PORT
    192.168.0.1:8080, 192.168.0.2:8080
  • 域名解析:

    传统DNS解析的工作由操作系统的域名和IP地址映射机制或域名解析服务器完成。而ZK实现的DDNS则需要每个应用自己从ZK数据节点上获取一份IP地址和端口的配置,并自行解析。每个应用都会在域名节点注册一个数据变更Watcher监听。

  • 域名变更:

    运行过程中要更新IP地址或端口时,只需对指定的域名节点进行更新操作,ZK会向订阅的客户端发送通知。

(2)自动化的DNS服务

上述实现在域名变更的环节还是需要人为的介入取修改域名节点上的IP地址和端口,ZK实现了自动化的DNS服务为了实现服务的自动化定位,系统架构如下:

重要的组件:

  • Register:集群负责域名的动态注册
  • Dispatcher:集群负责域名解析
  • Scanner:集群负责检测以及维护服务状态(探测服务的可用性、屏蔽异常服务节点等)
  • SDK:提供各种语言的系统接入协议,提供服务注册以及查询接口
  • Monitor:负责收集服务信息以及对DDNS自身状态的监控
  • Controller:后台管理的Console,负责授权管理、流量控制、静态配置服务和手动屏蔽服务等功能,运维人员可以在上面管理Register、Dispatcher、Scanner等集群。

自动化DNS流程:

  • 域名注册:

    服务提供者在启动过程把自己的域名信息注册到 Register 集群中去

    1. 服务提供者通过SDK提供的API接口,将域名、IP地址和端口发送给 Register 集群。
    2. Register 收到后根据域名将信息写入相应的ZK域名节点中。
  • 域名解析:与域名注册过程相反,服务消费者在使用域名时,会向 Dispatcher 发出域名解析请求。Dispatcher 收到请求后,会从ZK上指定域名节点读取相应的IP:PORT列表,通过一定的策略选取一个返回给前端应用。

  • 域名探测:

    指DDNS系统对域名下所有注册的IP地址和端口进行可用性检测/健康度检测,一般有两种方式:

    1. 服务端主动发起健康度心跳检测,需要在服务端和客户端之间建立起一个TCP长链接;
    2. 客户端主动向服务端发起健康度心跳检测。

    DDNS架构中的域名检测使用的是服务提供者主动定时向Scanner进行状态汇报的模式(第二种)。Scanner负责记录每个服务提供者最近一次的状态汇报时间,一旦超过5秒没有收到状态汇报就认定此IP和端口不可用,并进行域名清理过程。

    域名清理,Scanner会在ZK中找到对于域名节点,并将IP和端口配置从节点内容中移除。

1.3 命名服务

分布式系统中被命名的实体通常是集群中的机器、提供的服务地址或远程对象等,较常见的是一些分布式服务框架RPC或RMI中的服务地址列表,通过命名服务,客户端应用能够根据指定名字来获取资源的实体、服务地址和提供者的信息等。

  • Java的JNDI(Java命名与目录接口,Java Naming and Directory Interface)是一种典型的命名服务,标准的J2EE容器都提供了对JNDI规范的实现,可以用来完成数据源的配置和管理。
  • ZK的命名服务,上层应用只需一个全局唯一的名字,类似于唯一主键。

使用ZK来实现一套分布式全局唯一ID的分配机制:数据库的单库单表可以使用自带的 auto_increment 属性来自动为每条记录生成一个唯一ID,但随着数据库数据规模不断增大,分库分表后这个属性就只能针对单一表中记录自动生成ID。

UUID(通用唯一识别码,Universally Unique Identifier)在分布式系统中常用于作唯一标识元素标准,一个标准的UUID是一个包含32位字符和4个短线的字符串。

UUID的缺点:

  • 长度过长:生成的字符串过长,相比INT类型存储开销更大。
  • 含义不明:字符串本身没有含义。

可以通过ZK节点来生成全局唯一ID:

ZK中每个数据节点都能维护一份子节点的顺序队列,当客户端创建一个顺序子节点时,ZK会自动以后缀的形式在子节点上添加一个序号:

  1. 所有客户端根据自己的任务类型,在指定类型的任务下通过调用 create() 接口来创建一个顺序节点(如“job-”节点)。
  2. 节点创建完毕, create() 返回一个完整的节点名(如“job-0000000003”)。
  3. 客户端拿到返回值后,拼接上type类型(如“type2-job-0000000003”)作为全局唯一ID。

1.4 分布式协调/通知

分布式系统需要引入一个协调者(Coordinator)控制整个系统的运行流程(如分布式事务的处理或机器间的互相协调等)便于将分布式协调的职责从应用本身抽离,减少系统间的耦合性,提供可扩展性。

基于ZK实现的分布式协调/通知通常做法:不同的客户端都对ZK上同一个数据节点进行Watcher注册,监听数据节点的变化,如果发生变化,所有订阅的客户端都能收到并处理。

(1)MySQL数据复制总线-Mysql_Replicator

MySQL数据复制总线是一个实时数据复制框架,用于在不同的MySQL数据库实例之间进行异步数据复制合数据变化通知

系统是由一个MySQL数据库集群、消息队列系统、任务管理监控平台以及ZK集群等组件共同构成的一个包含数据生产者、复制管道和数据消费者等部分的数据总线系统:

ZK负责一系列的分布式协调工作,根据功能将数据复制组件划分为三个核心模块,每个模块作为独立的进程运行在服务端,运行时数据和配置信息保存在ZK上:

  • Core:实现了数据复制的核心逻辑,将数据复制封装成管道,并抽象出生产者和消费者两个概念,生产者通常是MySQL的Binlog日志。
  • Server:负责启动和停止复制任务。
  • Monitor:负责监控任务的运行状态,如果在数据复制期间发生异常或故障会告警。

(2)Core进行分布式协调的流程

  1. 任务注册:

    Core进程启动时,首先向 /mysql_replicator/tasks 节点(任务列表节点)注册任务。当注册过程发现子节点已存在,表示已有其他Task机器注册了该任务,以下为“复制热门商品”的任务:

  2. 任务热备份:

    • 复制组件采用热备份的容灾方式来应对复制任务可能会遇到的故障,即0将同一个复制任务部署在不同的主机上,主备机器通过ZK互相检测运行健康状况。
    • 实现热备方案,不管第一步是否创建了任务节点,每台任务机器都要在 /mysql_replicator/tasks/copy_hot_item/instances 节点上将自己的主机名注册上去。注册的是一个临时的顺序节点 /mysql_replicator/tasks/copy_hot_item/instances/[Hostname]-1 最后的序列号是临时顺序节点的精华。
    • 完成子节点创建后,每台机器都可以通过对比自己是否是所有子节点序号中最小的,如果是就将自己的运行状态置为 RUNNING,其余的机器则设置为 STANDBY,即小序号优先
  3. 热备切换:

    • 任务机器在完成状态标识后就可以工作了,RUNNING进行数据复制,STANDBY则进入待命状态,一旦RUNNING的机器出现故障,取一个待命状态最小的机器替补执行。
    • 所以待命的机器都要在 /mysql_replicator/tasks/copy_hot_item/instances 节点注册一个Watcher监听子节点列表变更,订阅所有任务机器的变化情况,RUNNING机器宕机与ZK断开连接后节点会消失,其余机器收到通知后开启新一轮的RUNNING选举。
  4. 记录执行状态:RUNNING机器需要将运行时的上下文保留给STANDBY机器,MySQL选择 /mysql_replicator/tasks/copy_hot_item/lastCommit 作为Binlog日志消费位点的存储节点, RUNNING定时写入当前信息。

(3)Server管理Core组件

  1. 控制台协调:Server的主要工作是通过ZK来对不同任务进行控制和协调,Server将每个复制任务对于生产者的元数据(库名、表名、用户名和密码等数据库信息以及消费者相关信息)以配置的形式写入任务节点 /mysql_replicator/tasks/copy_hot_item 使任务的所有机器都能能共享配置。

  2. 冷备切换:

    在一定规模的分布式项目中,往往有许多MySQL实例需要进行数据复制,每个实例都要对应一个复制任务,如果每个任务都进行双机热备份会消耗太多的机器。

    根据这个问题设计一种新的冷备份的方案,它对所有任务进行分组:

    Core进程被配置了所属Group,冷备份扫描

    • 假如一个Core进程被标记了group1,启动后会获取对应节点下所有的Task列表。
    • 如果找到了任务 copy_hot_item 会遍历instances节点,若没有子节点则创建一个临时的顺序节点 copy_hot_item/instances/[Hostname]-1
    • 类似于热备份的小序号优先,最小的Core进程标记为RUNNING,不同的是其他Core进程会自动将创建的子节点删除,然后继续遍历下一个Task节点
  3. 冷热备份对比:

    • 热备份:针对一个任务使用两台机器做备份,由于Watcher通知和临时顺序节点的特性,可以实时的进行协调;机器资源消耗较大。
    • 冷备份:使用了扫描机制,虽然降低了实时性,但节省了机器资源。

(4)机器间通信方式

常见分布式系统机器间通信方式:

  • 心跳检测:不同机器之间检测彼此是否在正常运行,使用ZK的临时节点实现而不同于常规的PING或TCP长连接。所有机器在ZK的一个指定节点下创建临时子节点,机器之间可以根据临时节点判断对应机器是否存活,这样机器之间不需要直接交互。
  • 工作进度汇报:任务分发到不同机器上执行后,需要实时的将自己的执行进度汇报给分发系统,每个任务客户端在指定节点下创建临时子节点:
    • 通过判断临时节点是否存在判断对应任务机器是否存活;
    • 各个任务机器实时的将自己的任务执行进度写入临时节点,中心系统定时的获取。
  • 系统调度:控制台将指令发送给所有客户端,实际是改ZK节点的数据,数据变更的通知再发送给客户端。

用ZK来实现分布式通信可以降低系统间的耦合和减少底层网络通信以及协议设计上的重复工作。

1.5 集群管理

集群管理包括:

  • 集群监控:侧重对集群运行时状态的收集
  • 集群控制:侧重对集群进行操作与控制

(1)传统Agent的弊端

传统基于Agent的分布式集群管理体系,每台机器部署一个Agent,负责主动向指定的监控中心汇报状态,来实现集群监控,其弊端:

  • 大规模升级困难:大规模升级的情况下,客户端形式的Agent非常麻烦。
  • 统一的Agent无法满足多样的需求:Agent可以满足如CPU使用率、负载、内存使用率、网络吞吐以及磁盘容量等基本物理状态的监控,但如果要深入到业务状态(比如监控每个消费者对消息的消费状态,或是分布式任务调度每台任务机器上的任务执行情况)就不适合由统一的Agent来提供。
  • 编程语言多样性:传统的Agent需要提供各种语言版的客户端。

(2)分布式日志收集系统

分布式日志收集系统要解决的问题:

  • 变化的日志源机器:每个应用的机器几乎每天都是变化的,可能是因为
  • 变化的收集器机器:日志收集系统自身也会有机器的变更和扩容。

流程:

  • 注册收集器机器:在ZK上创建一个节点作为收集器的根节点,每个收集器机器启动时都会在收集器节点下创建自己的节点。

  • 任务分发:待所有收集器都创建好自己的节点,根据子节点数目将所有日志源机器分为对应的若干组,然后将分组后的机器列表分别写到收集器创建的子节点上,每台收集器都可以从自己的节点上获取日志源机器列表。

  • 状态汇报:我们要考虑到机器有随时挂掉的可能,所以要有一个收集器的状态汇报机制,每个收集器机器在创建完自己的节点后还要再对应的子节点上创建一个状态子节点,定期向该节点写入自己的状态信息(类似于心跳检测)。

  • 动态分配:如果收集器挂掉或者扩容,需要动态的进行收集任务的分配。在运行过程中,日志系统始终关注着 /logs/collector 节点下所有子节点的变更,一旦检测到有收集器停止汇报或有新的收集器加入就会进行任务的重新分配。

    将之前分配的任务转移的方法:

    • 全局动态分配:系统根据新的收集器机器列表,将所有日志源机器重新进行分组,重新分配给当前所有收集器。
    • 局部动态分配:在小范围内进行任务的动态分配,每个收集器在汇报状态的同时也携带自己的负载(非CPU负载而是当前任务的综合评估),当收集器挂掉,就优先分配给负载较低的机器,新的机器加入时也会把负载较高的机器部分任务转移过去。

注意事项:

  • 节点类型:每个代表收集器的子节点如果是临时节点,当节点失效时会被立即删除,记录的日志源机器列表也会随之消失,所以应该是持久节点。
  • 日志系统节点监听:实际生产运行中,收集器更改状态节点的频率可能很高(小于1秒),且收集器数量会很大,日志系统如果要监听所有节点变化,对应的数据量太过庞大,所以日志系统并没有实时的接收每次节点状态变更,大部分都是没什么用的,而是采用主动轮询的方案,只是有一点延时。

(3)在线云主机管理

集群机器的监控,对机器的状态尤其是机器在线率统计有较高要求,并能对机器变更有快速的响应:

  1. 如何快速统计出当前生产环境一共有多少台机器?
  2. 如何快速获取到机器上下线情况?
  3. 如何实时监控集群中每台主机的运行时状态?
  • 机器上下线:新增机器时,先将指定的Agent部署到机器,部署启动后向ZK指定节点进行注册,创建一个临时子节点:

    创建完子节点后,对 /XAE/machines 节点关注的监控中心会收到“子节点变更”的事件,即上线通知;同理也可以收到下线通知。

  • 机器监控:运行过程中,Agent会定时将主机的运行状态信息写入ZK对应主机节点,监控中心通过订阅这些节点数据变更来获取运行时信息。

1.6 Master选举

分布式系统中,常常需要Master节点来协调其他单元,如一些读写分离的应用场景,客户端的写请求往往是由Master来处理的。

分布式环境常见场景:集群中所有系统单元需要对前端业务提供数据,如一个经过一系列海量数据处理计算得到的商品ID,让一台机器计算后共享给其他机器来减少重复劳动,系统可以分为客户端集群、分布式缓存系统、海量数据处理总线和ZooKeeper四个部分:

Client集群每台定时通过ZK完成Master选举,选出的Master进行一系列海量数据处理得到一个结果,将其放置在一个内存/数据库中,并通知其他客户端来获取。

我们对Master选举的需求是从所有机器中选择一台机器,通常情况下可以使用数据库主键来实现:所有机器插入一条相同主键的记录,成功的一台为Master。但这种方案没法应对Master挂掉的问题,因为关系型数据库无法通知这一事件。

利用ZooKeeper的强一致性,我们在分布式高并发场景下可以创建一个全局唯一的节点,所以我们可以实现:每天所有客户端定时创建一个临时节点 /master_election/2013-09-20/binding ,只有一个会成功创建,其就成为了Master,没有创建成功的再次节点上注册一个节点变更的Watcher,监控当前的Master机器是否存活,一旦挂掉就重新进行选举

1.7 分布式锁

不同主机共享一组资源,需要互斥手段防止彼此的干扰来保证一致性,此时使用分布式锁。关系型数据库本身具有排他性,来实现不同进程的互斥,但容易导致性能瓶颈。

(1)排他锁

排他锁(Exclusive Locks,即X锁)又叫写锁或独占锁,当事务A对数据对象O加排他锁,在持有锁期间只允许事务A对O进行读取和更新操作。

  • 定义锁:Java常用 synchronized 或 ReentrantLock 来定义锁,ZK没有类似的API来使用,而是直接用一个数据节点来表示一个锁(如 /exclusive_lock/lock

  • 获取锁:客户端通过调用 create() 接口在 /exclusive_lock 节点下创建临时子节点,最终只有一个可以创建成功,即获取到了锁,没有获取到的客户端在 /exclusive_lock 节点注册一个子节点变更的Watcher监听。

  • 释放锁:因为锁是一个临时节点,有两种情况会释放锁:

    • 当前获取锁的客户端集群宕机,ZK上此临时节点会被移除。
    • 正常执行完业务逻辑,客户端主动删除自己创建的临时节点。

    节点删除后,其他客户端收到通知再次发起分布式锁的获取,整个流程如下:

(2)共享锁

共享锁(Shared Locks,简称S锁)又叫读锁,事务A对数据对象O加上共享锁,其他事务只能对O进行读操作和加共享锁,直到所有共享锁释放。

  • 定义锁:直接用一个数据节点来表示一个锁(如 /shared_lock/192.168.0.1-R-000000001

  • 获取锁:客户端到 /shared_lock 节点下创建一个临时顺序节点,若当前是读请求,就创建R类节点;如果是写请求,就创建W类节点。

  • 判断读写顺序:

    1. 创建完节点后,获取 /shared_lock 节点下所有子节点,并注册Watcher变更监听。
    2. 确定自己的节点序号在所有子节点中的顺序。
    3. 读写两种情况:
      • 读请求:如果没有比自己序号小的子节点,或是小的节点都是读请求,表示自己已经获取到共享锁,同时开始执行读取逻辑。
      • 写请求:如果自己不是序号最小的子节点,就要进入等待。
    4. 接收到Watcher通知后,重复步骤1。
  • 释放锁:流程同排他锁。

    整个流程如下:

羊群效应

上述共享锁实现可以满足机器规模较小的场景(10台以内),当机器规模较大时,针对判断读写顺序步骤结合下图实例,猜想实际运行的情况。

实际运行:

  1. 192.168.0.1机器首先进行读操作,完成后将首个子节点删除。
  2. 余下4台机器均受到节点移除的通知,重新从 /shared_lock 节点上获取一份新的子节点列表。
  3. 每个机器判断自己的读写顺序,其中192.168.0.2这台机器检测到自己已经是最小机器了,于是开始读操作,而其他机器发现未轮到自己就继续等待。
  4. 继续….

其中步骤3,该通知发给了所有机器,但实际上只对192.168.0.2这台机器有用,整个流程大量的Watcher通知和子节点列表获取操作重复运行,大部分机器判断的结果都是需要继续等待。在集群规模较大的场景下,不仅会对ZooKeeper服务器造成巨大的性能影响和网络冲击,更为严重的是,若同一时间有多个节点对应的客户端完成事务或是事务中断引起节点消失,ZK服务器就会在短时间内向其余客户端发送大量的事件通知—即羊群效应

1
经济学里经常用“羊群效应”来描述经济个体的从众跟风心理。“羊群效应”就是比喻人都有一种从众心理,从众心理很容易导致盲从,而盲从往往会陷入骗局或遭到失败。

产生问题的原因在于没有找准客户端真正的关注点,每个客户端只需关注比自己小的那一个节点的变更情况即可:

  1. 客户端调用 create() 方法创建一个类似于 /shared_lock/[hostname]-请求类型-序号 的临时顺序节点。
  2. 客户端调用 getChildren() 接口来获取所有已经创建的子节点列表,这里不再注册任何Watcher。
  3. 如果无法获取共享锁,就调用 exist() 来对比自己小的那个节点注册Watcher,读写请求情况不一样:
    • 读请求:向比自己序号小的最后一个写请求节点注册
    • 写请求:向比自己序号小的最后一个节点注册
  4. 等待Watcher通知,继续进入步骤2。

1.8 分布式队列

分布式队列如常见的消息队列:ActiveMQ、Metamorphosis、Kafka和HornetQ等。分为两大类:常规的先入先出队列、等到队列元素集聚后才统一安排的Barrier模型。

(1)FIFO-先入先出

使用ZooKeeper实现FIFO队列类似于全写的共享锁模型,所有客户端到 /queue_fifo 节点下创建一个临时顺序节点。

创建完节点后,确定执行顺序的步骤:

  1. 通过调用 getChildren() 接口来获取 /queue_fifo 节点下所有的子节点,也就是获取队列中所有的元素。
  2. 确定自己的节点序号在所有子节点中的顺序。
  3. 如果自己不是序号最小的子节点,进入等待,同时向比自己序号小的最后一个节点注册Watcher监听。
  4. 接收到Watcher通知,重复步骤1。

(2)Barrier:分布式屏障

Barrier原意障碍物,在分布式系统中指系统间的一个协调条件,规定一个队列的元素必须都集聚后才能统一进行安排,否则一直等待。

开始时,/queue_barrier 节点是一个已存在的默认节点,并将其节点的数据内容赋值为一个数字n来代表Barrier值,当子节点数达到n后才能打开Barrier,所有客户端都要到此节点下创建一个临时节点。

创建完节点后,确定执行顺序的步骤:

  1. 通过调用 getData() 接口获取 /queue_barrier 节点的数据内容:10。
  2. 通过调用 getChildren() 接口获取 /queue_barrier 节点下所有子节点,即获取队列中所有元素,同时注册对子节点列表变更的Watcher监听。
  3. 统计子节点的个数。
  4. 如果子节点个数不足10个,需要进入等待。
  5. 接收到Watcher通知后,重复步骤2。

二. 大型分布式系统的应用

ZK因为便捷的使用方式、卓越的运行性能和良好的稳定性,已经被广泛的应用在越来越多的大型分布式系统中,用来解决如配置管理、分布式通知/协调、集群管理和Master选举等一系列分布式问题。

2.1 Hadoop

Hadoop是Apache开源的一个大型分布式计算框架,定义了一种能够开发和运行处理海量数据的软件规范,用来实现一个在大规模集群中对海量数据进行分布式计算的软件平台。核心是HDFS和MapReduce,分别提供了对海量数据的存储和计算能力。0.23.0版本后又引入全新一代MapReduce框架YARN。

在Hadoop中,ZooKeeper注意用于实现HA(High Availability,高可用)集中于Hadoop Common中的HA模块,HDFS的NameNode与YARN的ResourceManager都是基于此HA模块实现自己的HA功能,YARN还用ZooKeeper来存储应用的运行状态。

(1)YARN

YARN是Hadoop为了提高计算节点Master的扩展性,同时为了支持多计算模型和提供资源的细粒度调度而引入的全新一代分布式调度框架。

YARN由四部分组成:

  • ResourceManager(RM):核心模块,全局资源管理器,负责整个系统的资源管理和分配,同时接收各个节点(NodeManager)的资源汇报信息,并将信息按照一定的策略分配给各个应用程序(ApplicationMaster)。其内部维护了各个应用程序的ApplicationMaster信息、NodeManager信息以及资源使用信息等。
  • NodeManager(NM)
  • ApplicationMaster(AM)
  • Container

(2)ResourceManager单点问题

上述架构可以明显看出存在一个问题:ResourceManager单点问题。YARN设计了一套 Active/Standby 模式的 ResourceManager HA 架构

运行期间有多个RM并存,只有一个处于Active状态,另外则处于Standby状态,当Active节点无法正常工作时,其余节点通过竞争选举出新的Active节点。

YARN使用基于ZooKeeper实现的ActiveStandbyElector组件来确定RM的状态,从而实现多个RM的主备切换:

  1. 创建锁节点:ZK上会有一个类似于 /yarn-leader-election/pseudo-yarn-rm-cluster 的锁节点,所有RM在启动时,都会去竞争写一个Lock子节点 ../ActiveStandbyElectorLock ,该子节点类型是临时节点,创建成功的RM切换为Active状态,没有的切换为Standby状态。
  2. 注册Watcher监听:Standby状态的RM向 ../ActiveStandbyElectorLock 节点注册一个Watcher监听节点变更。
  3. 主备切换:当Active状态的节点出现异常情况,其创建的Lock节点会随之删除,所有Standby状态的RM收到通知后重复步骤1操作。

HDFS中的NameNode和ResourceManager都使用该组件来实现各自的HA。

(3)Fencing隔离

分布式环境中,机器会由于网络闪断或自身负载过高(GC占用时间过长或CPU负载过高)导致无法正常的对外进行及时响应,即“假死”的情况。当Active状态的RM1机器发生假死情况,主备切换后RM2成为Active状态,而RM1恢复正常后依然会认为自己处于Active状态,即分布式脑裂现象(Brain-Split)。

Fencing机制利用ZK数据节点的ACL权限控制机制来实现不同RM之间的隔离,多个RM之间通过竞争创建锁节点来实现主备状态的确定,改进为创建的根节点必须携带ZK的ACL信息,目的是为了独占该根节点,防止其他RM对节点进行更新。

当RM1出现假死后,ZK会将其创建的锁节点移除,RM2创建相应的锁节点并切换为Active状态。RM1恢复之后会试图更新ZK的相关数据,但发现没有权限,说明当前节点不是自己创建的,于是自动切换回Standby状态。

(4)ResourceManager状态存储

在ResourceManager中,RMStateStore能够存储一些RM的内部状态信息,包括Application和它们的Attempts信息、Delegation Token及Version Information等,大部分状态信息因为很容易从上下文信息中重构出来所以不需要持久化存储,所以存储的设计方案有:

  • 基于内存实现,一般用于日常开发测试。
  • 基于文件系统实现,如HDFS。
  • 基于ZooKeeper实现。

Hadoop官方建议使用ZooKeeper来实现,存放在 /rmstore 节点下,RMAppRoot节点下存放的是与各个Application相关的信息,RMDTSecretManagerRoot存放的是安全相关的Token等信息:

每个Active状态的RM在初始化阶段会从ZK上读取这些状态信息。

2.2 HBase

HBase,即 Hadoop Database,是 Google Bigtable 的开源实现,基于Hadoop文件系统设计的面向海量数据的高可靠性、高性能、面向列、可伸缩的分布式存储系统。

与大部分分布式NoSQL数据库不同的是,HBase针对数据写入具有强一致性的特性,甚至包括索引列也实现了强一致性。

HBase在实现上遵守了 Google BigTable 论文的设计思想,BigTable使用Chubby来负责分布式状态的协调,Chubby是Google实现的一种基于Paxos算法的分布式锁服务,HBase则采用了开源的ZooKeeper服务来完成对整个系统的分布式协调工作。

(1)系统冗错

启动时,每个RegionServer服务器都会到ZK的 /hbase/rs 节点下创建一个信息节点(rs状态节点),同时HMaster对这个节点注册监听。某个RS挂掉,ZK因为一段时间无法收到心跳信息,而删除掉其对应的rs状态节点。

同时HMaster收到ZK的NodeDelete通知,从而感知到某个节点断开,并立即开始冗错工作,将该RS处理的数据分片Region重新路由到其他节点上,并记录到Meta信息中供客户端查询。

HMaster不直接通过心跳机制管理RS的状态,因为会随着系统容量的不断增加其管理负担会越来越重。

(2)RootRegion管理

数据存储的位置信息记录在元数据分片RootRegion,每次客户端发起请求要知道数据的位置,先要查询RootRegion(存储在ZK上的 /hbase/root-region-server 节点),RootRegion发生变化,如Region的手工移动、Balance或RootRegion所在服务器发生故障,能够通过ZK来感知变化并执行容灾措施,从而保证客户端总能拿到正确的RootRegion信息。

(3)Region状态管理

Region是HBase中数据的物理切片,每个Region记录全局数据的一小部分,不同Region间数据不重复。但对于分布式系统来说,Region是会经常因为系统故障、负载均衡、配置修改、Region分裂和合并等发生变更。一旦Region移动就会尽力Offline和重新Online的过程。

Offline期间数据不能被访问,并且Region的状态变化需要让全局知晓,否则可能导致某些事务性异常。对于HBase集群,Region的数量可能达到10万级别,所以需要依赖ZK来做到。

(4)分布式SplitLog任务管理

某台RegionServer服务器挂掉时,总有一部分新写入的数据还没有持久化到HFile中,因此迁移该RegionServer服务时要从HLog中恢复这部分还在内存中的数据,最关键的一步是SplitLog,即HMaster需要遍历该RegionServer服务器的HLog,并按Region切分成小块移动到新的地址下,再进行数据的Replay。

单个RegionServer的日志量相对庞大(数千个Region,上GB的日志),但又需要系统快速的完成日志的恢复工作,一个可行的方案是将处理HLog的任务分配给多台RegionServer服务器来共同处理,需要一个持久化组件辅助HMaster完成任务的分配。

HMaster会在ZK上创建一个 /hbase/splitlog 的节点,将哪个RegionServer处理哪个Region这样的信息以列表的形式存放在该节点,各个RegionServer服务器自行到该节点上领取任务并在任务执行成功或失败后更新该节点的信息,以通知HMaster继续进行后续步骤。ZK负责分布式集群中相互通知和信息持久化的角色。

(5)Replication管理

HBase通过Replication实现实时的主备同步,从而拥有了容灾和分流等关系型数据库拥有的功能。与传统Replication不同的是,HBase中Replication是多对多的,且每个节点随时都有可能挂掉。

在ZK上记录一个 /hbase/replication 节点,然后把不同的RegionServer服务器对应的HLog文件名称记录到相应的节点上,HMaster集群会将新增的数据推送给Slave集群,并同时将推送信息记录到ZK上(断点记录),重复以上过程。服务器挂掉时,因为ZK上已经保存了断点信息,只要用这些信息协调推送HLog数据的主节点服务器就可以继续复制。

(6)ZooKeeper部署

HBase的启动脚本(hbase-env.sh)中可以选择由HBase启动其自带的默认ZK还是使用一个已有的外部ZK集群。一般建议后者,可以方便多个HBase复用一套ZK集群,但要为每个HBase集群明确指定对应的ZK根节点配置(配置项zookeeper.znode.parent)确保HBase集群间互不干扰。

对应HBase客户端只要指定ZK的集群地址和对应HBase根节点配置即可。HBase集群启动时会在ZK上逐个添加对应的初始化节点,并在HMaster以及RegionServer进程中进行相应节点的Watcher注册。

2.3 Kafka

Kafka是由社交公司LinkedIn于2010年12月份开源的分布式消息系统,由Scala语言开发,广泛运用在Twitter、Netflix和Tumblr等大型互联网站点上。

  • 主要用于实现低延迟的发送和收集大量的事件和日志数据(活跃数据,如网站的PV数和用户访问记录),这些数据以日志的形式记录下来,然后由一个专门的系统进行日志的收集与统计。
  • 吞吐量极高,整体设计是典型的发布与订阅模式系统。集群中所有服务器都是对等的,可以在不做任何配置更改的情况下实现服务器的添加与删除,消息的生产者和消费者也能做到随意重启和机器上下线。

术语

  • 消息生产者:即Producer,生成消息并发送到Kafka服务器上。
  • 消息消费者:即Consumer,负责消费Kafka服务器上的消息。
  • 主题:即Topic,由用户定义并配置在Kafka服务端,用于建立生产者和消费者之间的订阅关系:生产者发送消息到指定topic下,消费者从该topic消费消息。
  • 消息分区:即Partition,一个topic下分多个分区,如topic1分10个分区,由两台服务器提供,每台5个分区,服务器ID为0和1,分区为0-0到0-4和1-0到1-4。消息分区机制和分区数量与消费者的负载均衡机制有很大关系。
  • Broker:即Kafka的服务器,用于存储消息。
  • 消费者分组:即Group,用于归组同类消费者,多个消费者可以共同消费一个topic下的消息,这些消费者组成一个分组/集群。
  • Offset:消息存储在Kafka的Broker上,消费者拉取消息数据的过程需要在文件中的偏移量。

Broker注册

Broker、Producer和Consumer是分布式部署,Broker虽然相互独立运行但需要一个注册系统统一管理所有节点。ZK上配置一个Broker节点 /broker/ids ,每个Broker服务器启动时都先会到ZK上进行注册 /broker/ids/[0...N] 一个临时节点。

使用全局唯一ID(Broker ID)来指代每台Broker服务器,创建完节点后每个Broker会将自己的IP地址和端口等信息写入此节点。

Topic注册

Kafka中将同一个Topic的消息分为多个分区并分布到多个Broker上,这些分区信息和Broker的对应信息也由ZK维护,记录在节点 /brokers/topics ,即Topic节点。每个Topic都会对应一个节点 /brokers/topics/[topic]

Broker服务器启动后,会到对应的Topic节点下注册自己的Broker ID,并写入针对该Topic的分区总数 。如临时节点 /brokers/topics/login3->2 表明Broker ID为3的一台服务器对应,login这个主题的消息提供了2个分区进行消息存储。

生产者负载均衡

因为一个Topic会进行分区并分布到不同的Broker服务器,所以生产者要合理的将消息发送到对应的Broker上,如何进行生产者的负载均衡?

  • 四层负载均衡:
    • 根据生产者的IP地址和端口为器确定一个相关联的Broker,一个生产者对应一个Broker,其消息都发送给此Broker。
    • 优点:整体架构简单,每个生产者只需维护和Broker的单个TCP链接。
    • 缺点:无法做到真正的负载均衡,生产者也无法实时感知到Broker的新增和删除,也就无法做到动态的负载均衡。
  • ZooKeeper负载均衡:
    • 生产者通过Broker节点的变化动态感知到Broker服务器列表的变更,生产者注册对“Broker增加与减少”、“Topic的新增与减少”和“Broker与Topic关联关系的变化”等事件注册Watcher监听。
    • 开发人员可以控制生产者根据一定规则来进行数据分区,而不仅仅是随机算法。即语义分区。

消费者负载均衡

每个消费者分组都会分配一个全局唯一的Group ID,每个消费者分配一个全局唯一的Consumer ID,格式为 Hostname:UUID

ZK记录消息分区与消费者关系

每个消息分区有且只能同事有一个消费者进行消息的消费。所以要在ZK上记录下消息分区与消费者之间的对应关系。当消费者确定了对一个分区的消费权利,要将其Consumer ID写入对应分区的临时节点上,如 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 最后即消息分区的标识,节点内容就是消费该分区上消息的消费者的Consumer ID。

ZK记录消费进度Offset

消费者对分区进行消费时,要定时的将分区消息的消费进度/Offset(节点为 /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] ,内容为Offset值)记录到ZK上去,以便该消费者重启或其他消费者重新接管该消息分区消费后,能够从之前的进度开始继续消费。

消费者注册

消费者服务器初始化启动时加入消费者分组的过程:

  1. 注册到消费者分组Group:
    • 消费者启动时,在ZK注册自己的临时节点 /consumers/[group_id]/ids/[consumer_id]
    • 完成创建后,消费者将自己订阅的Topic信息写入该节点。
  2. 对Group中消费者的变化注册监听:每个消费者会关注所属Group中消费者服务器的变化情况,对 /consumers/[group_id]/ids 节点注册子节点变化的Watcher监听,当消费者发生变化,会触发消费者的负载均衡。
  3. 对Broker服务器的变化注册监听:消费者对 /broker/ids/[0...N] 中的节点进行监听注册,当Broker服务器列表发生变化,会根据具体情况决定是否需要进行消费者的负载均衡。
  4. 进行消费者负载均衡:让同一个Topic下不同分区的消息尽量均衡的呗多个消费者消费,当一个Group组内消费者服务器发生变更,或Broker服务器发生变更会触发消费者负载均衡。
负载均衡

一个消费者分组中每个消费者记为C1,C2,…,Ci ,…. ,CG 。对于消费者Ci其对应的消息分区分配策略为:

  1. 设置PT为指定Topic所有的消息分区。
  2. 设置CG为同一个消费者分组中所有的消费者。
  3. 对PT进行排序,使分布在同一个Broker服务器上的分区尽量靠在一起。
  4. 对CG进行排序。
  5. 设置i为Ci在CG中位置的索引值,同事hi设置 N = size(PT) / size(CG) 。
  6. 将编号为 i*N~(I+1)*N-1 的消息分区分配给消费者Ci
  7. 重写更新ZooKeeper上消息分区与消费者 Ci 的关系。

三. 阿里巴巴的应用实践

未完待续。

3.1 消息中间件-Metamorphosis

3.2 RPC服务框架-Dubbo

3.3 基于MySQL Binlog的增量订阅和消费组件-Canal

3.4 分布式数据库同步系统-Otter

3.5 轻量级分布式通用搜索平台-终搜

3.6 实时计算引擎-JStorm


参考:

🔗 《从Paxos到Zookeeper-分布式一致性原理与实践》