RabbitMQ(八)高阶

RabbitMQ(八)高阶

一. 存储机制

1.1 消息如何存储

消息:

  • 持久化消息:到达队列时写入磁盘,允许的话在内存保留备份,内存吃紧时清除。
  • 非持久化消息:一般保存在内存,内存吃紧时转移到磁盘。

消息进入磁盘的处理在持久层完成,持久层包含:

  • 队列索引(rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、是否己被交付给消费者、是否己被消费者 ack 等。每个队列都有一个对应的队列索引。

    • 存储方式:以顺序(文件名从0开始累加)的段文件来进行存储,后缀为 .idx ,每个段文件包含固定的 SEGMENT_ENTRY_COUNT 条记录,默认为16384。
  • 消息存储(rabbit_msg_store):以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。

    可以分为:

    • msg_store_persistent:负责持久化消息的持久化,重启后消息不会丢失;
    • msg_store_transient:负责非持久化消息的持久化,重启后消息会丢失。

    存储方式:经过消息存储处理的消息都会以追加的方式写入到文件中,大小超过指定的限制 file_size_limit 后,关闭文件再创建一个新文件。在进行消息的存储时, 会在 ETS (Erlang Term Storage)表中记录消息在文件中的位置映射(Index)和文件的相关信息(FileSummary)

消息(包括消息体、属性和 headers )可以直接存储在队列索引中,也可以被保存在消息存储中。一般按消息大小来划分,小消息放入前者,大消息放入后者。通过 queue_index_embed_msgs_below 来配置,默认为4096B(指消息体+属性+headers的整体大小)。

读取消息流程:

  • 先根据消息的 ID(msg_id)找到对应存储的文件,
  • 如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息的内容。
  • 如果文件不存在或者被锁住了, 则发送请求由 rabbit_msg_store 进行处理。

删除消息只是从ETS表删除相关信息,同时更新对应的存储文件,所以不会立即对文件中的消息进行删除,只是标记为垃圾数据(类似GC?)。

触发垃圾回收的条件:

  • 一个文件全是垃圾数据,会删除此文件。
  • 检测到前后两个文件(相邻)的有效数据可以合并在一个文件中,且所有垃圾数据的大小与文件总大小比值超过设定阈值 GARBAGE_FRACTION(默认0.5)时,触发垃圾回收,将文件合并。

合并时先锁定两个文件,先整理前个文件的有效数据,再将后面文件的有效数据写入前个文件,同时更新ETS表的记录,最后删除后面的文件

如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。而当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。

1.2 队列的结构

1.2.1 消息在队列中有四种状态

消息在队列中存在,其状态可能会不断发生变化:

  • alpha :消息内容(包括消息体、属性和 headers )和消息索引都存储在内存中。
  • beta :消息内容保存在磁盘中,消息索引保存在内存中。
  • gamma :消息内容保存在磁盘中,消息索引在磁盘和内存中都有。
    • 只有持久化消息会处于此状态;
    • durable 属性设置为 true 的消息一定进入此状态;
    • 在开启 publisher confirm 机制时, 只有到了 gamma 状态时才会确认该消息己被接收,若消息消费速度足够快,内存也充足,这些消息也不会继续走到下一个状态。
  • delta :消息内容和索引都在磁盘中。需要两次I/O操作才能读到消息,一次读消息索引(rabbit_queue_index),一次读消息内容(rabbit_msg_store)。前面状态则只需一次,直接读rabbit_msg_store,因为索引在内存中,而delta的索引在磁盘中。

状态转换:

  • 在运行时可以根据消息的传送速度计算一个当前内存能够保存的最大消息数量 target_ram_count ,当 alpha 的消息数量大于此值时,会把多余的消息转为后续状态。
  • 状态机制为了满足不同的内存和CPU需求,alpha 耗内存,但不怎么耗CPU;而如 delta 不消耗内存,但会消耗很多CPU和I/O操作。

1.2.2 队列子结构

队列结构包括:

  • rabbit_amqqueue_process:负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack )等。
  • backing_queue:是消息存储的具体形式和引擎,并向 rabbit_amqqueue_process 提供相关的接口以供调用。
    • 默认实现:rabbit_variable_queue,通过5个子队列Q1、Q2、Delta、Q3和Q4实现各个消息状态。

队列结构如图:Q1和Q4只包含alpha状态,Delta只包含delta状态,Q2和Q3包含beta和gamma状态;所以消息按这个顺序流动就是从内存到磁盘再回内存的过程,能够适应负载升高和降低的变化。

1.2.3 从子队列消费消息

消费者获取消息:

  • 尝试从Q4获取,成功则返回;
  • 否则尝试从Q3获取,此时判断Q3是否为空,为空表示队列无消息;
  • Q3不为空则取出消息,再判断此时Q3和Delta的长度:
    • 若都为空,可以认为Q2到Q4都为空,将Q1消息转移到Q4方便下次读取。
    • 若Q3为空,Delta不为空,将消息转移至Q3,方便下次获取。转移的过程按索引分段读取。
  • 为什么Q3为空可以认为整个队列为空?因为若Q3为空时,Delta不为空,在取Q3取最后一条消息时必然会将Delta的消息转移过来,所有前面的子队列都可以类推。

1.2.4 为什么消息堆积很棘手

当负载较高时,消息不能很快被消费掉,就会进入到比较靠后的队列中,这就增加了处理每个消息的平均开销。因为要花更多的时间和资源来处理堆积的消息,自然处理新流入的消息的能力就会降低,新流入的消息又积压,这就构成了负循环,使情况越来越差。

处理方案

  • 增加 prefetch_count 的值,即一次发送多条消息给消费者,加快消息被消费的速度
  • 采用 multiple ack ,降低处理 ack 带来的开销
  • 流量控制

1.3 惰性队列

RabbitMQ 在 3.6.0 版本引入了惰性队列,惰性队列会尽可能地将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中

  • 设计目标:能够支持更长的队列,即支持更多的消息存储
  • 使用场景:当消费者由于各种各样的原因(比如消费者下线、宕机或者由于维护而关闭等)致使长时间内不能消费消息而造成堆积时,惰性队列就很有必要了。

1.3.1 为什么要使用惰性队列?

默认情况下,消息生产到队列后会尽可能的存储到内存,以便更快速的发送给消费者。即使是持久化的消息,也在存入磁盘的同时驻留在内存一份备份。

内存占用过高时,需要释放内存将消息转移到磁盘,这一操作会耗费很长的时间,并且阻塞队列的操作,导致无法接收新的消息。

惰性队列则会将消息直接存到文件系统中,从而减少了内存的损耗。但相应的增加了I/O的使用,但其实对于持久化消息,I/O操作是无法避免的,所以持久化消息和惰性队列是绝配

1.3.2 使用惰性队列

可以在声明队列时设置惰性:

1
2
3
4
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
// 可以在声明队列时设置惰性
channel.queueDeclare("myqueue", false, false, false, args);

也可以通过策略Policy的方式设置:(优先级大于前者)

1
$ rabbitmqctl set_policy Lazy "^myqueue$" '{"queue-mode":"lazy"}' --apply-tO queues 

1.3.3 惰性与普通队列的性能差距

假如发送1千万条消息,每条消息的大小为1KB ,并且此时没有任何的消费者,那么普通队列会消耗 1.2GB 的内存,而惰性队列只消耗 1.5MB 的内存。

对于普通队列,如果要发送1千万条消息,需要耗费 801 秒,平均发送速度约为 13000 条/秒。如果使用惰性队列,那么发送同样多的消息时,耗时是 421 秒,平 均发送速度约为 24000 条/秒。

出现性能偏差的原因是普通队列会由于内存不足而不得不将消息换页至磁盘。如果有消费者消费时,惰性队列会耗费将近 40MB 的空间来发送消息,对于一个消费者的情况,平均的消费速度约为 14000 条/秒。

二. 内存及磁盘告警

内存有使用上限阈值,磁盘有剩余空间阈值,达到时都会使RabbitMQ暂时阻塞 客户端的连接并停止接收从客户端发来的消息,以此避免服务崩溃。心跳检测也会失效。

检测状态:

1
$ rabbitmqctl list_connections
  • blocking:不发送消息的连接处于此状态。
  • blocked:一直有发送消息的处于此状态。

2.1 内存告警

RabbitMQ 服务器会在启动或者执行 rabbitmqctl set_vm_memory_high_watermark fraction 命令时计算系统内存的大小,默认 vm_memory_high_watermark 为0.4,表示内存阈值为0.4,当RabbitMQ使用内存超过40%就会告警并阻塞。

设置内存阈值:(不建议取值超过0.7)

  • 通过 rabbitmq.config 配置文件来配置:(需要重启服务器来启用)

    1
    2
    3
    4
    5
    6
    7
    8
    [
    {
    rabbit, [
    {vm_memory_high_watermark, 0.4}
    # 绝对值方式: {vm_memory_high_watermark, {absolute, 1073741824}} 也可以带单位{absolute,"1024MiB"}
    ]
    }
    ]
  • 通过 rabbitmqctl 指令来配置:(服务器重启后会失效)

    1
    2
    $ rabbitmqctl set_vm_memory_high_watermark 0.4
    $ rabbitmqctl set_vm_memory_high_watermark absolute {memory_limit}

当 Broker 节点达到内存阈值前会尝试将消息从内存换页到磁盘来释放空间(持久化和非持久化都会,持久化只须清除内存的副本即可),一般在内存阈值的50%会进行此操作。

设置换页阈值

  • 通过 rabbitmq.config 配置文件来配置:

    1
    2
    3
    4
    5
    6
    7
    8
    [
    {
    rabbit, [
    {vm_memory_high_watermark_paging_ratio, 0.75},
    {vm_memory_high_watermark, 0.4}
    ]
    }
    ]

2.2 磁盘告警

默认磁盘阈值为50MB,当磁盘空间低于这个值使会阻塞生产者并停止内存的换页操作。

RabbitMQ 会定期检测磁盘剩余空间,检测的频率与上一次执行检测到的磁盘剩余空间大小有关。正常情况下,每 10 秒执行一次检测,随着磁盘剩余空间与磁盘阈值的接近,检测频率会有所增加。当要到达磁盘阑值时,检测频率为每秒 10 次,这样有可能会增加系统的负载。

设置磁盘阈值:(不建议取值超过0.7)

  • 通过 rabbitmq.config 配置文件来配置:(需要重启服务器来启用)

    1
    2
    3
    4
    5
    6
    7
    8
    [
    {
    rabbit, [
    {disk_free_limit, 1000000000}
    # {disk_free_limit, "1GB"} 或 {disk_free_limit, {mem_relative, 1.0}} 磁盘与内存大小的比值,建议1到2
    ]
    }
    ]
  • 通过 rabbitmqctl 指令来配置:(服务器重启后会失效)

    1
    2
    $ rabbitmqctl set_disk_free_limit {disk_limit}
    $ rabbitmqctl set_disk_free_limit mem_relative {fraction}

三. 流量控制

RabbitMQ 从 2.8.0 版本引入流控(Flow Control)机制来保证稳定性。流控机制是用来避免消息的发送速率过快而导致服务器难以支撑的情形。

内存及磁盘告警相当于全局的流控,一旦触发会阻塞集群中所有的连接;而流控则只针对单个连接。

3.1 流控的原理-信用证算法

Erlang 进程之间并不共享内存(binary 类型的除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱(mailbox)。进程邮箱默认没有大小限制,所以大量消息发往某个进程会导致内存溢出并崩溃。对于RabbitMQ来说没有流控,很容易达到使邮箱大小达到内存阈值。

RabbitMQ 使用了一种基于信用证算法(credit-based algorithm)的流控机制来限制发送消息的速率以解决前面所提出的问题:它通过监控各个进程的进程邮箱,当某个进程负载过高而来不及处理消息时,这个进程的进程邮箱就会开始堆积消息。当堆积到一定量时,就会阻塞而不接收上游的新消息。从而慢慢地,上游进程的进程邮箱也会开始堆积消息。当堆积到一定量时也会阻塞而停止接收上游的消息,最后就会使负责网络数据包接收的进程阻塞而暂停接收新的数据。

3.1.1 信用证算法说明

如图 9-4 ,进程A接收消息并转发至进程B,进程B接收消息并转发至进程C。每一个进程中都有一对关于收发消息的 credit 值

以进程B为例:

  • { {credit_from, C}, value} 表示能发送多少条消息给C,每发送一条消息该值减1,当为0时,进程B不再往进程C发送消息也不再接收进程A的消息。
  • { {credit_to, A}, value} 表示再接收多少条消息就向进程A发送增加 credit 值的通知,进程A接收到该通知后就增加 { {credit_from, B}, value} 所对应的值,这样进程A就能持续发送消息。

当上游发送速率高于下游接收速率时, credit 值就会被逐渐耗光,这时进程就会被阻塞,阻塞的情况会一直传递到最上游。当上游进程收到来自下游进程的增加 credit 值的通知时,若此时上游进程处于阻塞状态则解除阻塞,开始接收更上游进程的消息,一个一个传导最终能够解除最上游的阻塞状态。由此可知,基于信用证的流控机制最终将消息发送进程的发送速率限制在消息处理进程的处理能力范围之内。

可以通过 rabbitmqctl list_connections 命令或者Web管理界面来查看Connection的状态。

Connection触发流控时会处于flow状态,这个状态值并没有什么作用。

3.1.2 流控链

流控机制不只是作用于 Connection ,同样作用于信道和队列。从Connection到Channel,再到队列,最后是消息持久化存储形成一个完整的流控链,对于处于整个流控链中的任意进程,只要该进程阻塞,上游的进程必定全部被阻塞。也就是说,如果某个进程达到性能瓶颈,必然会导致上游所有的进程被阻塞。所以我们可以利用流控机制的这个特点找出瓶颈之所在,处理消息的几个关键进程及其对应的顺序关系如图 9-6 所示:

  • rabbit_reader:Connection 的处理进程,负责接收、解析 AMQP 协议数据包等。
  • rabbit_channel:Channel 的处理进程,负责处理 AMQP 协议的各种方法、进行路由解析等。
  • rabbit_amqqueue_process:队列的处理进程,负责实现队列的所有逻辑。
  • rabbit_msg_store:负责实现消息的持久化。

3.1.3 判断瓶颈情况

  • 当某个 Connection 处于flow状态,但此 Connection 中没有一个Channel处于flow状态,表示有一个或多个Channel出现了性能瓶颈。
    • 某些 Channel 进程的运作(比如处理路由逻辑)会使得服务器 CPU 负载过高从而导致了此种情形,尤其是在发送大量较小的非持久化消息时,此种情形最易显现。
  • 当某个 Connection 处于flow状态,且此 Connection 中有若干个Channel处于flow状态,但没有一个队列处于flow状态,表示有一个或多个队列出现了性能瓶颈。
    • 这可能是由于将消息存入队列的过程中引起服务器 CPU 负载过高,或者是将队列中的消息存入磁盘的过程中引起服务器 I/O 负载过高而引起的此种情形。尤其是在发送大量较小的持久化消息时,此种情形最易显现。
  • 当某个 Connection 处于flow状态,且此 Connection 中有若干个Channel处于flow状态,且有若干个队列处于flow状态,表示在消息持久化时出现了性能瓶颈。
    • 在将队列中的消息存入磁盘的过程中引起服务器 I/O 负载过高而引起的此种情形。尤其是在发送大量较大的持久化消息时,此种情形最易显现。

3.2 案例:打破队列的瓶颈

通常会在rabbit_amqqueue_process即队列进程产生性能瓶颈,所以其上游的连接和信道会处于flow状态。

如何提高队列的性能?

  1. 开启Erlang的HiPE功能,保守可以提升30%~40%的性能。
  2. 打破队列的瓶颈,指用多个rabbit_amqqueue_process来打破单个队列的性能上限,充分利用上游被流控的性能。

如图:

多个队列会导致应用复杂化,所以首先要封装,将交换器、队列、绑定关系、生成和消费的方法全部封装,使应用就像仍在操作一个队列。

封装实现步骤:

  1. 声明交换器、队列、绑定关系:先与 Broker 建立连接,声明交换器一样,声明队列和绑定关系实际上是多个物理队列,先规划好分片数,分别声明对应数量的二者,如图 9-8。
  2. 封装生产者:
  3. 封装消费者:

(1)声明交换器、队列、绑定关系

实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* host,port,vhost,username,password值可以在rmq_cfg.properties 文件中配置
*/
public class RmqEncapsulation {
private static String host = "localhost";
private static int port = 5672;
private static String vhost = "/";
private static String username = "guest";
private static String password = "guest";

private static Connection connection;
private int subdivisionNum;//分片数,表示一个逻辑队列背后的实际队列数

public RmqEncapsulation(int subdivisionNum) {
this.subdivisionNum = subdivisionNum;
}

// 创建Connection
public static void newConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connection = connectionFactory.newConnection();
}

// 获取Connection
public static Connection getConnection() throws IOException, TimeoutException {
if (connection == null) {
newConnection();
}
return connection;
}

// 关闭Connection
public static void closeConnection() throws IOException {
if (connection != null) {
connection.close();
}
}

// 声明交换器
public void exchangeDeclare(Channel channel, String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException {
channel.exchangeDeclare(exchange, type, durable, autoDelete, autoDelete, arguments);
}

// 声明队列,这里针对单个Broker设计,若集群有多个节点,需要让分片队列能均匀的散开到各个节点,达到负载均衡
public void queueDeclare(Channel channel, String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
for (int i = 0;i < subdivisionNum;i++) {
String queueName = queue + "_" + i;
channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments);
}
}

// 创建绑定关系
public void queueBind(Channel channel, String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {
for (int i = 0;i < subdivisionNum;i++) {
String rkName = routingKey + "_" + i;
channel.queueBind(queueName, exchange, rkName, arguments);
}
}
}

使用封装工具来声明交换、队列和绑定关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
RmqEncapsulation rmqEncapsulation = new RmqEncapsulation(4);
try {
Connection connection = RmqEncapsulation.getConnection();
Channel channel = connection.createChannel();
rmqEncapsulation.exchangeDeclare(channel, "exchange", "direct", true, false, null);
rmqEncapsulation.queueDeclare(channel, "queue", true, false, false, null);
rmqEncapsulation.queueBind(channel, "queue", "exchange", "rk", null);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
RmqEncapsulation.closeConnection();
} catch (IOException e) {
e.printStackTrace();
}
}

(2)封装生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void basicPublish(Channel channel, String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException { 
//随机挑选一个队列发送
Random random = new Random();
int index = random.nextInt(subdivisionNum);
String rkName = routingKey + "_" + index;
channel.basicPublish(exchange, rkName, mandatory, props, body);
}

// 使用发送100条消息
Channel channel = connection.createChannel();
for(int i = 0;i < 100;i++) {
// Message用来封装消息
Message message = new Message();
// msgSeq表示消息的序号,有序
message.setMsgSeq(i);
// msgBody表示消息体,还有deliveryTag用于消息确认
message.setMsgBody("rabbitmq encapsulation");
// 通过Serializable接口来实现序列化,实际使用建议用ProtoBuff这种性能高的序列化工具
// 对象转换为字节数组
byte[] body = getBytesFromObject(message);
rmqEncapsulation.basicPublish(channel, "exchange", "rk", false, MessageProperties.PERSISTENT_TEXT_PLAIN, body);
}

封装4个物理队列与单个独立队列的QPS对比:

(3)封装消费者

消费分推模式和拉模式,拉模式的封装实现比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public GetResponse basicGet(Channel channel, String queue, boolean autoAck) throws IOException { 
// 首先随机拉取一个物理队列的数据
GetResponse getResponse = null;
Random random = new Random();
int index = random.nextInt(subdivisionNum);
getResponse = channel.basicGet(queue+"_"+index, autoAck);
// 若返回为空,再顺序拉取
if (getResponse == null) {
for (int i = 0;i < subdivisionNum;i++) {
String queueName = queue + "_" + i;
getResponse = channel.basicGet(queueName, autoAck);
if (getResponse != null) {
return getResponse;
}
}
}
return getResponse;
}

当生产者发送速度大于消费者消费速度时,顺序拉取可能只拉取到第一个物理队列的数据,而其余3个物理队列的数据可能会被长久积压。

推模式封装需要在 RmqEncapsulation 中添加一个 ConcurrentLinkedDeque<Message> 类型的成员变量 blockingQueue,用来缓存推送的数据以方便消费者消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class RmqEncapsulation {
......

private static Connection connection;
private int subdivisionNum;//分片数,表示一个逻辑队列背后的实际队列数
private ConcurrentLinkedDeque<Message> blockingQueue;

public RmqEncapsulation(int subdivisionNum) {
this.subdivisionNum = subdivisionNum;
blockingQueue = new ConcurrentLinkedDeque<Message>();
}

......
//省略 newConnection 方法、 getConnection 方法、 closeConnect on 方法的实现
//省略 exchangeDeclare 方法、 queue Declare 方法、 queueBind 方法的实现
//省略 basicPublish 方法和 basicGet 方法的实现
private void startConsume(Channel channel, String queue, boolean autoAck, String consumerTag, ConcurrentLinkedDeque<Message> newblockingQueue) throws IOException {
for (int i = 0;i < subdivisionNum;i++){
String queueName = queue + "_" + i;
channel.basicConsume(queueName, autoAck, consumerTag + i, new NewConsumer(channel, newblockingQueue));
}
}

// 推模式
public void basicConsume(Channel channel, String queue, boolean autoAck, String consumerTag, ConcurrentLinkedDeque<Message> newblockingQueue, IMsgCallback iMsgCallback) throws IOException {

startConsume(channel, queue, autoAck, consumerTag, newblockingQueue);
while (true) {
Message message = newblockingQueue.peekFirst();
if (message != null) {
// IMsgCallback包含一个回调函数consumeMsg(Message message)
ConsumeStatus consumeStatus = iMsgCallback.consumeMsg(message);
newblock ngQueue removeFirst()
if (consumeStatus == ConsumeStatus.SUCCESS) {
channel.basicAck(message.getDeliveryTag(), false);
} else {
channel.basicReject(message.getDeliveryTag(), false);
}
} else {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 获取Broker中的数据然后存入blockingQueue中
public static class NewConsumer extends DefaultConsumer {
private ConcurrentLinkedDeque<Message> newblockingQueue;
public NewConsumer(Channel channel, ConcurrentLinkedDeque<Message>
newblockingQueue) {
super(channel);
this.newblockingQueue = newblockingQueue;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Message message = (Message) getObjectFromBytes(body);
message.setDeliveryTag(envelope.getDeliveryTag());
newblockingQueue.addLast(message);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
}

推模式使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
Channel channel = connection.createChannel();
channel.basicQos(64);
rmqEncapsulation.basicConsume(channel, "queue", false, "consumer_zzh", rmqEncapsulation.blockingQueue, new IMsgCallback() {
@Override
public ConsumeStatus consumeMsg(Message message) {
ConsumeStatus consumeStatus = ConsumeStatus.FAIL;
if (message != null) {
System.out.println(message);
consumeStatus = ConsumeStatus.SUCCESS;
}
return consumeStatus;
}
});

示例省去了很多的功能,局限性很强,比如没有使用 publisher confirm 机制;没有设置 mandatory mandory 参数;只能使用一个 Connection;消息没有使用 Protostuff 进行序列化等。

发送端根据 Message 的消息序号 msgSeq 对分片个数进行取模运算,之后将对应的消息发送到对应的队列中,这样消息可以均匀且顺序地在每个队列中存储。在消费端为每个队列创建一个消息槽(slot),从队列中读取的消息都存入对应的槽中,发送到下游的消息可以依次从slot0到slot3进行读取。

四. 镜像队列

如果RabbitMQ 集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的 durable 属性也设置为true ,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisher confirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此, 一般不希望遇到因单点故障导致的服务不可用。

尽管交换器和绑定关系能够在单点故障问题上幸免于难,但是队列和其上的存储的消息却不行,这是因为队列进程及其内容仅仅维持在单个节点之上,所以一个节点的失效表现为其对应的队列不可用。

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。在通常的用法中,针对每一个配置镜像的队列(以下简称镜像队列〉都包含一个主节点(master )和若干个从节点( slave ),相应的结构可以参考图 9-11 :

slave 会准确地按照 master 执行命令的顺序进行动作,故 slave 与 master 上维护的状态应该是相同的。如果 master 由于某种原因失效,那么“资历最老”的 slave 会被提升为新的 master 。根据 slave 加入的时间排序,时间最长的 slave 即为“资历最老”。发送到镜像队列的所有消息会被同时发往 master 和所有的 slave 上,如果此时 master 挂掉了,消息还会在 slave 上,这样 slave 提升为 master 的时候消息也不会丢失。除发送消息(Basic.Publish)外的所有动作都只会向 master 发送,然后再由 master 将命令执行的结果广播给各个 slave 。

如果消费者与slave 建立连接井进行订阅消费,其实质上都是从master 上获取消息,只不过看似是从slave 上消费而己。比如消费者与slave 建立了TCP 连接之后执行一个Basic.Get的操作,那么首先是由slave 将Basic.Get 请求发往master ,再由master 准备好数据返回给slave ,最后由slave 投递给消费者。

大多的读写压力都落到了master 上,那么这样是否负载会做不到有效的均衡?或者说是否可以像MySQL 一样能够实现master 写而slave 读呢?注意这里master 和slave 是针对队列而言的,而队列可以均匀地散落在集群的各个Broker 节点中以达到负载均衡的目的,因为真正的负载还是针对实际的物理机器而言的,而不是内存中驻留的队列进程。

在图 9-12 中, 集群中的每个Broker 节点都包含1 个队列的master 和2 个队列的slave, Q1的负载大多都集中在broker1 上, Q2 的负载大多都集中在broker2 上, Q3 的负载大多都集中在broker3 上,只要确保队列的master 节点均匀散落在集群中的各个Broker 节点即可确保很大程度上的负载均衡(每个队列的流量会有不同,因此均匀散落各个队列的master 也无法确保绝对的负载均衡)。至于为什么不像MySQL 一样读写分离, RabbitMQ 从编程逻辑上来说完全可以实现,但是这样得不到更好的收益,即读写分离并不能进一步优化负载,却会增加编码实现的复杂度,增加出错的可能,显得得不偿失。

RabbitMQ 的镜像队列同时支持publisher confirm 和事务两种机制.在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到Tx.Commit-Ok 的消息。同样的,在publisherconfirm 机制中, 生产者进行当前消息确认的前提是该消息被全部进行所接收了。

不同于普通的非镜像队列(参考图9-2 ),镜像队列的backing_queue 比较特殊,其实现并非是rabbit_variable_queue ,它内部包裹了普通backing_queue 进行本地消息消息持久化处理,在此基础上增加了将消息和ack 复制到所有镜像的功能。镜像队列的结构可以参考图9-13, master 的backing_queue 采用的是rabbit_mirror_queue_master ,而slave的backing queue 实现是rabbit_mirror_queue_slave。

所有对 rabbit_mirror_queue_master 的操作都会通过组播GM (Guaranteed Multicast) 的方式同步到各个slave 中。GM 负责消息的广播, rabbit_mirror_queue_slave 负责回调处理,而master 上的回调处理是由coordinator 负责完成的。如前所述,除了 Basic.Publish ,所有的操作都是通过master 来完成的, master 对消息进行处理的同时将消息的处理通过GM 广播给所有的slave , s lave 的GM 收到消息后,通过回调交由 rabbit_mirror queue_slave 进行实际的处理。

GM 模块实现的是一种可靠的组播通信协议, 该协议能够保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到,它的实现大致为:将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上:当有节点失效时,相邻的节点会接管以保证本次广播的消息会复制到所有的节点。在master 和slave 上的这些GM 形成一个组(gm_group),这个组的信息会记录在Mnesia 中。不同的镜像队列形成不同的组。操作命令从master 对应的GM 发出后,顺着链表传送到所有的节点。由于所有节点组成了一个循环链表, master 对应的GM 最终会收到自己发送的操作命令,这个时候master 就知道该操作命令都同步到了所有的slave 上。

新节点的加入过程可以参考图9-14 ,整个过程就像在链表中间插入一个节点。注意每当一个节点加入或者重新加入到这个镜像链路中时,之前队列保存的内容会被全部清空。

当slave 挂掉之后,除了与slave 相连的客户端连接全部断开,没有其他影响。当master 挂掉之后,会有以下连锁反应:

  1. 与master 连接的客户端连接全部断开。
  2. 选举最老的slave 作为新的master ,因为最老的s lave 与旧的master 之间的同步状态应该是最好的。如果此时所有slave 处于未同步状态,则未同步的消息会丢失。
  3. 新的master 重新入队所有unack 的消息,因为新的slave 无法区分这些unack 的消息是否己经到达客户端,或者是ack 信息丢失在老的master 链路上,再或者是丢失在老的master 组播ack 消息到所有slave 的链路上,所以出于消息可靠性的考虑,重新入队所有unack 的消息,不过此时客户端可能会有重复消息。
  4. 如果客户端连接着slave ,并且Basic.Consume 消费时指定了x-cancel-on-ha-failover 参数,那么断开之时客户端会收到一个Consumer Cancellation Notification 的通知,消费者客户端中会回调Consumer 接口 handleCancel 方法。如果未指定 x-cancel-on-ha-failover 参数,那么消费者将无法感知master 岩机。

x-cancel-on-ha-failover 参数的使用示例如下:

1
2
3
4
5
Channel channel= ... ;
Consumer consumer= ... ;
Map<String, Object> args = new HashMap <String, Object>();
args.put("x-cancel-on-ha-failover", true);
channel.basicConsume ("my-queue", false, args, consumer);

镜像队列的配置主要是通过添加相应的Policy 来完成的,rabbitmqctl set_policy [-p vhost] [--priority priority] [ --apply-to apply-to] {name} {pattern} {definition} 命令中的 definition 部分,对于镜像队列的配置来说, definition 中需要包含3个部分: ha-mode 、ha-params 和ha-sync-mode :

  • ha-mode : 指明镜像队列的模式,有效值为all 、exactly 、nodes ,默认为all 。
    • all 表示在集群中所有的节点上进行镜像;
    • exactly 表示在指定个数的节点上进行镜像,节点个数由ha-params 指定;
    • nodes 表示在指定节点上进行镜像,节点名称通过ha-params 指定,节点的名称通常类似于rabbit@hostname ,可以通过 rabbitmqctl cluster_status 命令查看到。
  • ha-params : 不同的ha-mode 配置中需要用到的参数。
  • ha-sync-mode : 队列中消息的同步方式,有效值为automatic 和manual。

ha-mode 参数对排他(exclusive)队列并不生效,因为排他队列是连接独占的,当连接断开时队列会自动删除,所以实际上这个参数对排他队列没有任何意义。

将新节点加入己存在的镜像队列时,默认情况下ha-sync-mode 取值为manual ,镜像队列中的消息不会主动同步到新的s lav e 中,除非显式调用同步命令。当调用同步命令后,队列开始阻塞,无法对其进行其他操作,直到同步完成。当ha-sync-mode 设置为automatic 时,新加入的slave 会默认同步己知的镜像队列。由于同步过程的限制,所以不建议对生产环境中正在使用的队列进行操作。使用rabbitrnqctl list_queues {name} slave_pids synchronised_slave_pids 命令可以查看哪些slaves 已经完成同步。通过手动方式同步一个队列的命令为 rabbitrnqctl sync queue {name} ,同样也可以取消某个队列的同步操作: rabbitrnqctl cancel sync queue {name}

当所有slave 都出现未同步状态,并且ha-promote-on-shutdown 设置为when-synced(默认)时,如果master 因为主动原因停掉,比如通过rabbitrnqctl stop 命令或者优雅关闭操作系统,那么slave 不会接管master ,也就是此时镜像队列不可用;但是如果master 因为被动原因停掉,比如Erlang 虚拟机或者操作系统崩溃,那么slave 会接管master 。这个配置项隐含的价值取向是保证消息可靠不丢失,同时放弃了可用性。如果ha-promote-on-shutdown 设置为always ,那么不论master 因为何种原因停止, slave 都会接管master ,优先保证可用性,不过消息可能会丢失。

镜像队列中最后一个停止的节点会是master ,启动顺序必须是master 先启动。如果slave先启动,它会有30 秒的等待时间,等待master 的启动,然后加入到集群中。如果30 秒内master没有启动, slave 会自动停止。当所有节点因故(断电等)同时离线时,每个节点都认为自己不是最后一个停止的节点,要恢复镜像队列,可以尝试在30 秒内启动所有节点。


参考:

🔗 《RabbitMQ实战指南》