RabbitMQ(六)运维

RabbitMQ(六)运维

一. 集群搭建

单台 RabbitMQ 服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是解决实际问题的关键。

RabbitMQ 集群也不能保证消息的万无一失,即将消息、队列、交换器等都设置为可持久化,生产端和消费端都正确地使用了确认方式。当集群中一个 RabbitMQ 节点崩溃时,该节点上的所有队列中的消息也会丢失。 RabbitMQ 集群中的所有节点都会备份所有的元数据信息, 包括以下内容:

  • 队列元数据:队列的名称及属性;
  • 交换器:交换器的名称及属性:
  • 绑定关系元数据:交换器与队列或者交换器与交换器之间的绑定关系;
  • vhost 元数据:为 vhost 内的队列、交换器和绑定提供命名空间及安全属性。

但是不会备份消息(当然通过特殊的配置比如镜像队列可以解决这个问题)。基于存储空间和性能的考虑,在 RabbitMQ 集群中创建队列,集群只会在单个节点而不是在所有节点上创建队列的进程并包含完整的队列信息(元数据、状态、内容)。这样只有队列的宿主节点,即所有者节点知道队列的所有信息,所有其他非所有者节点只知道队列的元数据和指向该队列存在的那个节点的指针。因此当集群节点崩溃时,该节点的队列进程和关联的绑定都会消失。附加在那些队列上的消费者也会丢失其所订阅的信息,井且任何匹配该队列绑定信息的新消息也都会消失。

不同于队列那样拥有自己的进程,交换器其实只是一个名称和绑定列表。当消息发布到交换器时,实际上是由所连接的信道将消息上的路由键同交换器的绑定列表进行比较,然后再路由消息。当创建一个新的交换器时, RabbitMQ 所要做的就是将绑定列表添加到集群中的所有节点上。这样,每个节点上的每条信道都可以访问到新的交换器了。

创建集群的过程可以看作向集群中添加节点的过程。

1.1 多机多节点配置

多机多节点主要是指在每台机器中部署一个 RabbitMQ 服务节点,进而由多台机器组成一个 RabbitMQ 集群。

假设这里一共有三台物理主机,均己正确地安装了 RabbitMQ ,且主机名分别为 node1、node2、node3。RabbitMQ 集群对延迟非常敏感,应当只在本地局域网内使用 。在广域网中不应该使用集群,而应该使用 Federation 或者 Shovel 来代替

1.1.1 配置流程

  • 第一步,配置各个节点的 hosts 文件,让各个节点都能互相识别对方的存在。

    • 比如在 Linux 系统中可以编辑 /etc/hosts 文件,在其上添加地址与节点名称的映射信息:

    • ```
      192.168.0.2 node1
      192.168.0.3 node2
      192.168.0.4 node3

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33

      * 第二步,编辑 RabbitMQ 的 cookie 文件,以确保各个节点的 cookie 文件使用的是同一个值。

      * 可以读取 node1 节点的 cookie 值,然后将其复制到 node2、node3 节点中。

      * cookie 文件默认路径为:`/var/lib/rabbitmq/.erlang.cookie` 或者 `$HOME/.erlang.cookie` 。

      * 集群中的 RabbitMQ 节点需要通过交换密钥令牌以获得相互认证。

      * 如果节点的密钥令牌不一致,那么在配置节点时就会有如下的报错:

      ```shell
      $ rabbitmqctl join_cluster rabbit@node1
      Clustering node rabbit@node2 with rabbit@node1
      Error: unable to connect to nodes [rabbit@node1]: nodedown

      DIAGNOSTICS
      ===========

      attempted to contact: [rabbit@node1]

      rabbit@node1:
      * connected to epmd (port 4369) on node1
      * epmd reports node rabbit runn ng on port 25672
      * TCP connection succeeded but Erlang distribution failed

      # 注意此处
      * Authentication failed (rejected by the remote node), please check the Erlang cookie

      current node details:
      - node name: 'rabbitmq-cli-53@node2'
      - home dir: /root
      - cookie hash: kLtTY75JJGZnZpQF7CqnYg==
  • 第三步,配置集群。

    • 配置集群有三种方式 :

      • 通过 rabbitmqctl 工具配置;
      • 通过 rabbitmq.config 配置文件配置;
      • 通过 rabbitmq-autocluster 插件配置。
    • 示例通过 rabbitmqctl 工具配置:

      • 首先启动 node1、node2、node3 这几个节点的 RabbitMQ 服务:

        1
        2
        3
        [root@node1 ~]# rabbitmq-server -detached
        [root@node2 ~]# rabbitmq-server -detached
        [root@node3 ~]# rabbitmq-server -detached
      • 开启后这3个节点目前都是以独立节点存在的单个集群,查看各个节点的状态:

        1
        2
        3
        [root@node1 ~]# rabbitmqctl cluster_status 
        [root@node2 ~]# rabbitmqctl cluster_status
        [root@node3 ~]# rabbitmqctl cluster_status
      • 接下来为了将这3个节点组成一个集群,需要以 node1 节点为基准,将 node2 和 node3 节点加入 node1 节点的集群中。这3个节点是平等的,如果想调换彼此的加入顺序也未尝不可。

        1. 首先将 node2 节点加入 node1 节点的集群中:(此时再通过 rabbitmqctl cluster_status 可以看到两个节点的信息)

          1
          2
          3
          4
          5
          6
          7
          8
          [root@node2 ~]# rabbitmqctl stop app
          Stopping rabbit application on node rabbit@node2
          [root@node2 ~]# rabbitmqctl reset
          Resetting node rabb t@node2
          [root@node2 ~]# rabbitmqctl join cluster rabbit@node1
          Clustering node rabbit@node2 with rabbit@node1
          [root@node2 ~]# rabbitmqctl start_ app
          Starting node rabbit@node2
        2. 再将 node3 节点也加入 node1 节点所在的集群中,步骤同上。

1.1.2 节点关闭的几种情况

  • 如果集群中某个节点关闭了,会使集群处于怎样的状态?

    1
    2
    3
    4
    5
    # 关闭 node2 节点
    [root@node2 ~]# rabbitmqctl stop app
    Stopping rabbit application on node rabbit@node2
    # 在 node1 节点查看集群状态时,会发现 running_nodes 已没有 node2
    [root@node1 ~]# rabbitmqctl cluster_status
  • 如果关闭了集群中的所有节点,则需要确保在启动的时候最后关闭的那个节点是第一个启动的

    • 如果第一个启动的不是最后关闭的节点,那么这个节点会等待最后关闭的节点启动。这个等待时间是 30 秒,如果没有等到,那么这个先启动的节点也会失败。
    • 在最新的版本中会有重试机制,默认重试 10 次 30 秒以等待最后关闭的节点启动。
  • 如果最后一个关闭的节点最终由于某些异常而无法启动,则可以通过 rabbitmqctl forget_cluster_node 命令来将此节点剔出当前集群。

  • 如果集群中的所有节点由于某些非正常因素,比如断电而关闭,那么集群中的节点都会认为还有其他节点在它后面关闭,此时需要调用 rabbitmqctl force_boot 命令来启动一个节点,之后集群才能正常启动。

1.2 集群节点类型

1.2.1 两种节点类型

使用 rabbitmqctl cluster_status 命令来查看集群状态时会有 {nodes, [{disc, [rabbit@node1,rabbit@node2,rabbit@node3]}] 这一项信息,其中的 disc 标注了 RabbitMQ 点的类型。

不论是单一节点还是集群节点,有两种类型:

  • 内存节点:将所有的队列、 交换器、绑定关系、用户、权限和 vhost 的元数据定义都存储在内存中;
  • 磁盘节点:存放到磁盘中。所以单节点集群只能是磁盘节点,否则重启后会丢失系统的配置信息。

1.2.2 如何指定或切换节点类型

将节点指定为内存节点:

1
2
3
4
5
6
7
8
9
10
11
# 将 node2 节点加入 node1 节点的时候可以指定 node2 节点的类型为内存节点
[root@node2 ~]# rabbitmqctl join_cluster rabbit@node1 --ram
Clustering node rabbit@node2 with rabbit@node1
# 也可以在集群搭建好以后再通过 rabbitmqctl change_cluster_node_type {disc, ram}命令切换类型
[root@node2 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node2
[root@node2 ~]# rabbitmqctl change cluster node type disc
Turning rabbit@node2 into a disc node
[root@node2 ~]# rabbitmqctl start app
Starting node rabbit@node2
[root@node2 ~]# rabbitmqctl cluster_status

1.2.3 如何选择内存还是磁盘节点

在集群中创建队列、交换器或者绑定关系的时候,这些操作直到所有集群节点都成功提交元数据变更后才会返回。对内存节点来说,这意味着将变更写入内存;而对于磁盘节点来说,这意味着昂贵的磁盘写入操作。内存节点可以提供出色的性能,磁盘节点能够保证集群配置信息的高可靠性,如何在这两者之间进行抉择呢?

  • 首先集群中至少有一个磁盘节点;
  • 新增或删除节点时需要将变更通知到至少一个磁盘节点;
  • 如果只有一个磁盘节点,而且它刚好崩溃了,集群可以继续发送或者接收消息,但是不能执行创建队列、交换器、绑定关系、用户,以及更改权限、添加或删除集群节点的操作了。即唯一磁盘节点崩溃时,集群可以继续运行,但不能变更
  • 在内存节点重启后,它们会连接到预先配置的磁盘节点,下载当前集群元数据的副本。当在集群中添加内存节点时,确保告知其所有的磁盘节点(内存节点唯一存储到磁盘的元数据信 息是集群中磁盘节点的地址)。只要内存节点可以找到至少一个磁盘节点,那么它就能在重启后重新加入集群中。
  • 除了使用RPC功能时,大多数的操作就是生产或者消费消息。为了确保集群信息的可靠性,或者在不确定使用磁盘节点或者内存节点的时候,建议全部使用磁盘节点

1.3 剔除单个节点

1.3.1 如何从集群中删除一个节点

两种方案:

  • 第一种:适合节点不再运行RabbitMQ的情况。

    • 首先在 node2 节点上执行 rabbitmqctl stop_app 或者 rabbitmqctl stop 命令来关闭 RabbitMQ 服务;
    • 然后再在 node1 节点或者 node3 节点上执行 rabbitmqctl forget_cluster_node rabbit@node2 (可以添加 -offline 即使非运行状态也可以生效)命令将 node2 节点剔除出去。
  • 第二种:只是简单的将节点从集群中移出,变成单一节点。

    • 在 node2 上执行 rabbitmqctl reset 命令。此命令将清空节点的状态,并将其恢复到空白状态,也会和集群中的磁盘节点进行通信,告诉它们该节点正在离开集群。

    • ```shell
      [root@node2 ~]# rabbitmqctl stop app
      Stopping rabbit application on node rabbit@node2
      [root@node2 ~]# rabbitmqctl reset
      Resetting node rabbit@node2
      [root@node2 ~]# rabbitmqctl start app
      Starting node rabbit@node2

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66



      ### 1.4 集群节点的升级



      #### 1.4.1 独立节点的升级



      只须先关闭原来的服务,然后解压新的版本再运行即可。不过要确保原节点的 Mnesia 中的数据不被变更,且新节点中的 Mnesia 路径的指向要与原节点中的相同。



      #### 1.4.2 集群节点的升级步骤



      单个节点的升级步骤:

      1. 关闭所有节点的服务,注意采用 `rabbitmqctl stop` 命令关闭。
      2. 保存各个节点的 Mnesia 数据。
      3. 解压新版本的 RabbitMQ 到指定的目录。
      4. 指定新版本的 Mnesia 路径为步骤2中保存的 Mnesia 数据路径。
      5. 启动新版本的服务,注意先重启原版本中最后关闭的那个节点。



      步骤4和步骤5可以一起操作,比如执行 `RABBITMQ_MNESIA_BASE=/opt/mnesia rabbitmq-server-detached` 命令,其中 `/opt/mnesia` 为原版本保存 Mnesia 数据的路径。



      **在对不同版本升级的过程中,最好先测试两个版本互通的可能性,然后再在线上环境中实地操作。**



      如果原集群上的配置和数据都可以舍弃,则可以删除原版本的 RabbitMQ ,然后再重新安装配置即可:如果配置和数据不可丢弃 ,则按照上面所述保存元数据,之后再关闭所有生产者,并等待消费者消费完队列中的所有数据,紧接着关闭所有消费者,然后重新安装 RabbitMQ 重建元数据等。



      也可以利用集群迁移直接转为新的集群。



      ### 1.5 单机多节点配置



      有时候不得不在单台物理机器上去创建一个多 RabbitMQ 服务节点的集群,需要**确保每个节点都有独立的名称、数据存储位置、端口号(包括插件的端口号)等**。



      我们在主机名称为 node1 的机器上创建一个由 rabbit1@node1、rabbit2@node 1 和 rabbit3@node1 这3个节点组成 RabbitMQ 集群。



      * 为每个 RabbitMQ 服务节点设置不同的端口号和节点名称来启动相应的服务。

      ```shell
      [root@node1 ~]# RABBITMQ NODE PORT=5672 RABBITMQ NODENAME=rabbit1
      rabbitmq-server -detached
      [root@node1 ~]# RABBITMQ NODE PORT=5673 RABBITMQ NODENAME=rabbit2
      rabbitmq-server -detached
      [root@node1 ~]# RABBITMQ NODE PORT=5674 RABBITMQ NODENAME=rabbit3
      rabbitmq-server -detached
  • 在启动 rabbit1@node1 节点的服务之后,继续启动 rabbit2@node1 和 rabbit@node1 服务节点会遇到启动失败的情况。

    • 这种情况大多数是由于配置发生了冲突而造成后面的服务节点启动失 败,需要进一步确认是否开启了某些功能,比如 RabbitMQ Management 插件。

    • 如果开启了 RabbitMQ Management 插件,就需要为每个服务节点配置一个对应插件端口号:

      1
      2
      3
      [root@node1 ~]# RABBITMQ NODE PORT=5672 RABBITMQ NODENAME=rabbit1 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672)]" rabbitmq-server -detached
      [root@node1 ~]# RABBITMQ NODE PORT=5673 RABBITMQ NODENAME=rabbit2 RABBITMQ_SERVER_START_ARGS="- rabbitmq_management listener [{port, 15673}]" rabbitmq-server -detached
      [root@node1 ~]# RABBITMQ NODE PORT=5674 RABBITMQ NODENAME=rabbit3 RABBITMQ_SERVER_START_ARGS="- rabbitmq_management listener [{port,15674}]" rabbitmq-server -detached
  • 启动各节点服务之后,将 rabbit2@node1 节点加入 rabbit1@node1 集群之中:

    1
    2
    3
    4
    5
    6
    7
    8
    [root@node1 ~] # rabbitmqctl -n rabbit2@node1 stop app
    Stopping rabbit application on node rabbit2@node1
    [root@node1 ~J # rabbitmqctl -n rabbit2@node1 reset
    Resetting node rabbit2@node1
    [root@node1 ~]# rabbitmqctl -n rabbit2@node1 join_cluster rabbit1@node1
    Clustering node rabbit2@node1 with rabbit1@node1
    [root@node1 ~]# rabbitmqctl -n rabbit2@node1 start_app
    Starting node rabbit2@node1
  • 执行相似的操作将 rabbit3@node1 也加入进来,并通过 rabbitmqctl cluster_status 命令来查看各个服务节点的集群状态:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    [root@node1 ~]# rabbitmqctl -n rabbit1@node1 cluster_status
    Cluster status of de rabbit1@node1
    [{nodes, [{disc, [rabbit1@node1, rabbit2@node1, rabbit3@node1]}]},
    {running_nodes, [rabbit3@node1, rabbit2@node1, rabbit1@node1]},
    {cluster_name, <<"rabbit1@node1">>},
    {partitions, []},
    {alarms, [{rabbit3@node1, []}, {rabbit2@node1, []}, {rabbit1@node1, []}]}]
    [root@node1 ~]# rabbitmqctl -n rabbit2@node1 cluster_status
    Cluster status of node rabbit2@node1
    [{nodes, [{disc, [rabbit1@node1, rabbit2@node1, rabbit3@node1]}]},
    {running_nodes, [rabbit3@node1, rabbit1@node1, rabbit2@node1]},
    {cluster_name, <<"rabbit1@node1">>},
    {partitions, []} ,
    {alarms, [{rabbit3@node1, []} , {rabbit1@node1, []}, {rabbit2@node1, []}]}]
    [root@node1 ~]# rabbitmqctl -n rabbit3@node1 cluster_status
    Cluster status of node rabbit3@node1
    [{nodes, [{disc, [rabbit1@node1, rabbit2@node1, rabbit3@node1]}]},
    {running_nodes, [rabbit1@node1, rabbit2@node1, rabbit3@node1]} ,
    {cluster_name, <<"rabbit1@node1">>},
    {partitions, []} ,
    {alarms, [{rabbit1@node1, []}, {rabbit2@node1, []}, {rabbit3@node1, []}]}]

RabbitMQ 单机多节点配置大多用于实验性论证,生产环境还是选用多机多节点的集群。

二. 查看服务日志

RabbitMQ 日志中包含各种类型的事件,比如连接尝试、服务启动、插件安装及解析请求时的错误等

RabbitMQ 的日志默认存放在 $RABBITMQ_HOME/var/log/rabbitmq 文件夹内。在这个文件夹内 RabbitMQ 会创建两个日志文件:

  • RABBITMQ_NODENAME-sasl.log :SASL (System Application Support Libraries,系统应用程序支持库)是库的集合,作为 Erlang-OTP 发行版的一部分。提供了一系列标准,其中之一是日志记录格式。**当 RabbitMQ 记录 Erlang 相关信息时,它会将日志写入文件RABBITMQ_NODENAME-sasl.log **。
  • RABBITMQ_NODENAME.log :RabbitMQ 服务日志指的就是这个文件。

2.1 启动RabbitMQ服务

  • 使用 rabbitmq-server -detached 命令启动RabbitMQ服务,顺带会启动 Erlang 虚拟机和RabbitMQ应用服务。
  • 使用 rabbitmqctl start_app 用来启动 RabbitMQ 应用服务(启动成功的前提是 Erlang 虚拟机运转正常)。
  • 如果使用 rabbitmqctl stop_app 命令关闭的 RabbitMQ 应用服务,那么在使用 rabbitmqctl start_app 命令开启 bbitMQ 应用服务时的启动日志和 rabbitmq-server 的启动日志相同。

相关服务日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# RabbitMQ 版本号、Erlang 的版本号
Starting RabbitMQ 3.6.2 on Erlang 19.1
Copyright (C) 2007-2016 Pivotal Software, Inc .
Licensed under the MPL. See http://www.rabbitmq.com/

# INFO REPORT 和 WARNING REPORT 表示日志级别
# RabbitMQ 服务节点名称、cookie 的 hash 值、配置文件地址
=INFO REPORT==== 3-Oct-2017: :10:52:08 ===
node : rabbit@node1
home dir : /root
config file(s) : /opt/rabbitmq/etc/rabbitmq/rabbitmq.config (not found)
cookie hash : VCwbL3S9/ydrGgVsrLjVkA==
log : /opt/rabbitmq/var/log/rabbitmq/rabbit@node1.log
sasl log : pt/rabbitmq /var/ log/rabbitmq/rabbit@node1-sasl.log
database dir : /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1

# 内存限制
=INFO REPORT==== 3-Oct-2017: :10:52:09 ===
Memory limit set to 3148MB of 7872MB total.

# 磁盘限制
=INFO REPORT==== 3-Oct-2017: :10:52:09 ===
Disk free limit set to 50MB

=INFO REPORT==== 3-Oct-2017: :10:52:09 ===
Limiting to approx 924 file handles (829 sockets)

=INFO REPORT==== 3-Oct-2017: :10:52:09 ===
FHC read buffering: OFF
FHC write buffering: ON

=INFO REPORT==== 3-Oct-2017: :10:52:09 ===
Database rectory at /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1 is
empty. Initialising from scratch...

=INFO REPORT==== 3-Oct-2017: :10:52:10 ===
Priority queues enabled, real BQ is rabbit_variable_queue

=INFO REPORT==== 3-Oct-2017: :10:52:10 ===
Adding vhost '/'

# 默认账户 guest 的创建及权限配置
=INFO REPORT==== 3-Oct-2017: :10:52:10 ===
Creating user 'guest'

=INFO REPORT==== 3-Oct-2017: :10:52:10 ===
Setting user tags for user 'guest' to [admistrator]

=INFO REPORT==== 3-Oct-2017: :10:52:10 ===
Setting permissions for 'guest' in to '.*', '.*', '.*'

=INFO REPORT==== 3-Oct-2017: :10:52:10 ===
msg_store transient: using rabbit_msg_store_ets_index to provide index

=INFO REPORT==== 3-Oct-2017: :10:52:10 ===
msg_store persistent using rabbit_msg_store_ets_index to provide index

=WARNING REPORT==== 3-Oct-2017: :10:52:10 ===
msg_store persistent: rebuilding indices from scratch

=INFO REPORT==== 3-Oct-2017: :10:52:10 ===
started TCP Listener on [::]:5672

=INFO REPORT==== 3-Oct-2017 ::10:52 : 10 ===
Server startup complete; 0 plugins started.

日志级别:(通过 rabbitmq.config 配置文件中 log_levels 参数来设置,默认为 [{connection, info}]

  • none
  • error
  • warning
  • info
  • debug

2.2 关闭RabbitMQ服务

如果使用 rabbitmqctl stop 命令,会将 Erlang 虚拟机一同关闭,而 rabbitmqctl stop_app 只关闭 RabbitMQ 应用服务。

1
2
3
4
5
6
7
8
9
10
11
12
=INFO REPORT==== 3-Oct-2017: :10:54:01 ===
Stopping RabbitMQ

=INFO REPORT==== 3-Oct-2017: :10:54:01 ===
stopped TCP Listener on [::]:5672

=INFO REPORT==== 3-Oct-2017: :10:54:01 ===
Stopped RabbitMQ application

# 如果使用 rabbitmqctl stop 来进行关闭操作,则会多出下面的日志信息,即关闭 Erlang 虚拟机。
=INFO REPORT==== 3-Oct-2017: :10:54:01 ===
Halting Erlang VM

2.3 建立集群

举例将节点 rabbit@node2 和 rabbit@node1 组成一个集群:

  • 首先在 rabbit@node2 中执行 rabbitmq-server -detached 开启 Erlang 虚拟机和 RabbitMQ 应用服务,之后再执行 rabbitmqctl stop_app 来关闭 RabbitMQ 应用服务,之后需要重置节点 rabbit@node2 中的数据 rabbitmqctl reset ,在 rabbit@node2 节点输出如下日志:

    1
    2
    =INFO REPORT==== 3-Oct-2017: :11:25:01 ===
    Resetting Rabbit
  • 在 rabbit@node2 节点上执行 rabbitmqctl join_clcuster rabbit@node1 ,将其加 rabbit@node1 中以组成一个集群,在 rabbit@node2 节点输出如下日志:

    1
    2
    =INFO REPORT==== 3-Oct-2017: :11:30:46 ===
    Clustering with [rabbit@node1] as disc node
  • 同时在 rabbit@node1 节点输出如下日志:

    1
    2
    =INFO REPORT==== 3-Oct-2017: :11:30:56 ===
    node rabbit@node2 up
  • 如果此时在 rabbit@node2 节点上执行 rabbitmqctl stop_app 的动作,那么在 rabbit@node1 节点中会有如下信息:

    1
    2
    3
    4
    =INFO REPORT==== 3-Oct-2017: :11:54:01 ===
    rabbit on node rabbit@node2 down
    =INFO REPORT==== 3-Oct-2017: :11:54:01 ===
    Keep rabbit@node2 listeners: the node is already back

2.4 其他

客户端与 RabbitMQ 建立连接:

1
2
=INFO REPORT==== 14-0ct-2017: :16:24:55 ===
accepting AMQP connection <0.5865.0> (192.168.0.9:61601 -> 192.168.0.2:5672)

客户端强制中断连接时:

1
2
3
=WARNING REPORT==== 14-Jul-2017: :16:36:57 ===
closing AMQP connection <0.5909.0> (192.168.0.9:61629 -> 192.168.0.2:5672)
connection_closed_abruptly

2.5 日常积累各种操作对应日志格式

可以通过尝试各种的操作以收集相应的服务日志,之后组成一个知识集,这个知识集不单单指一个日志列表,需要通过后期的强化训练掌握其规律,让这个知识集了然于心 。在真正遇到异常故障的时候可以通过查看服务日志来迅速定位问题,之后再采取相应的措施以解决问题。

比如在执行任何 RabbitMQ 操作之前,都会打开一个新的窗口运 tail -f $RABBITMQ_HOME/var/log/rabbitmq/rabbit@$HOSTNAME.log -n 200 命令来实时查看相应操作所对应的服务日志是什么,久而久之即可在脑海中建立一个相对完备的“知识集”。

2.6 日志文件管理

RabbitMQ 中可以通过 rabbitmqctl rotate_logs {suffix} 命令来轮换日志,比如手工切换当前的日志:rabbitmqctl rotate_logs.bak ,之后可以看到在日志目录下会建立新的日志文件,并且将老的日志文件以添加 .bak 后缀的方式进行区分保存:

1
2
3
4
5
[root@node1 rabbitmq]# ls - al
-rw-r--r-- 1 root root 0 Jul 23 00:50 rabbit@node1.log
-rw-r--r-- 1 root root 22646 Jul 23 00:50 rabbit@node1.log.bak
-rw-r--r-- 1 root root 0 Jul 23 00:50 rabbit@node1-sasl.log
-rw-r--r-- 1 root root 0 Jul 23 00:50 rabbit@node1-sasl.log.bak

也可以执行一个定时任务,比如使用 Linux crontab ,以当前日期为后缀,每天执行一次切换日志的任务,这样在后面需要查阅日志的时候可以根据日期快速定位到相应的日志文件。

RabbitMQ 还可以通过程序化的方式来查看相应的日志, 默认会创建一些交换器, 其中 amq.rabbitmq.log 就是用来收集 RabbitMQ 日志的,集群中所有的服务日志都会发往这个交换器中。这个交换器的类型为 topic ,可以收集如前面所说的 debug、info、warning、error 4个级别的日志。

首先确认是否创建 amq.rabbitmq.log 交换器:

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@iZ2zeet6kto8eqx1w7sluzZ ~]# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name type
exchange_demo direct
amq.fanout fanout
amq.rabbitmq.trace topic
myAe fanout
amq.headers headers
amq.topic topic
amq.direct direct
normalExchange direct
direct
amq.match headers

配置文件 vi /etc/rabbitmq/rabbitmq.conf (没有就新建)开启:

1
log.exchange = true

重启下RabbitMQ:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# rabbitmqctl stop
Stopping and halting node rabbit@iZ2zeet6kto8eqx1w7sluzZ ...
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# rabbitmq-server restart
Configuring logger redirection

## ## RabbitMQ 3.8.8
## ##
########## Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
###### ##
########## Licensed under the MPL 2.0. Website: https://rabbitmq.com

Doc guides: https://rabbitmq.com/documentation.html
Support: https://rabbitmq.com/contact.html
Tutorials: https://rabbitmq.com/getstarted.html
Monitoring: https://rabbitmq.com/monitoring.html

Logs: /var/log/rabbitmq/rabbit@iZ2zeet6kto8eqx1w7sluzZ.log
/var/log/rabbitmq/rabbit@iZ2zeet6kto8eqx1w7sluzZ_upgrade.log

Config file(s): /etc/rabbitmq/rabbitmq.conf

Starting broker... completed with 4 plugins.

可以看到交换器已启动:

分别创建四个日志队列,并采用相应的路由键来绑定 amq.rabbitmq.log 交换器,如果想用一个日志队列收集所有级别日志可以使用 # 这个路由键。

编写代码打印对应级别日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ReceiveLog {
private static final String IP_ADDRESS = "101.200.124.26";
private static final int PORT = 5672;//RabbitMQ 服务端默认端口号为 5672


public static void main(String[] args) {
try{
//创建连接
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection(addresses);

//创建对应日志级别的信道
Channel channelDebug = connection.createChannel();
Channel channelInfo = connection.createChannel();
Channel channelWarn = connection.createChannel();
Channel channelError = connection.createChannel();
channelDebug.basicQos(1);
channelInfo.basicQos(1);
channelWarn.basicQos(1);
channelError.basicQos(1);
channelDebug.basicConsume("queue.debug", false, "DEBUG", new ConsumerThread(channelDebug));
channelInfo.basicConsume("queue.info", false, "INFO ", new ConsumerThread(channelInfo)) ;
channelWarn.basicConsume("queue.warning", false, "WARNING", new ConsumerThread(channelWarn)) ;
channelError.basicConsume("queue.error", false, "ERROR", new ConsumerThread(channelError)) ;
} catch (IOException | TimeoutException ex) {
ex.printStackTrace();
}
}

public static class ConsumerThread extends DefaultConsumer {
public ConsumerThread(Channel channel){
super(channel);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String log = new String(body);
System.out.println("=" + consumerTag + " REPORT====\n" + log);
//对日志进行处理
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}
}

控制台打印:

1
2
3
4
5
=INFO  REPORT====
2020-10-09 16:00:19.709 [info] <0.1768.0> accepting AMQP connection <0.1768.0> (115.236.91.15:64187 -> 172.17.48.148:5672)

=INFO REPORT====
2020-10-09 16:00:19.978 [info] <0.1768.0> connection <0.1768.0> (115.236.91.15:64187 -> 172.17.48.148:5672): user 'root' authenticated and granted access to vhost '/'

要注意的是各个节点对应级别的日志是交错在一起的。可以通过检索日志的 running_partitioned_network 关键字来及时地探测到网络分区的发生,之后可以迅速采取措施以保证集群服务的鲁棒性。当然对于日志的监控处理也可以采用第3方工具实现,如 Logstash 等。

三. 单点故障恢复

3.1 什么是单点故障

对于集群层面来说,经常遇到的是单点故障。所谓的单点故障是指集群中单个节点发生了故障,有可能会引起集群服务不可用、数据丢失等异常。配置数据节点冗余(镜像队列)可以有效地防止由于单点故障而降低整个集群的可用性、可靠性。

3.2 四种常见的单点故障

单节点故障包括:

  • 机器硬件故障:包括机器硬盘、内存、主板等故障造成的死机,无法从软件角度来恢复。
    • 此时需要在集群中的其他节点中执行 rabbitmqctl forget_cluster_node {nodename} 命令来将故障节点剔除,其中 nodename 表示故障机器节点名称。
    • 如果之前有客户端连接到此故障节点上,在故障发生时会有异常报出,此时需要将故障节点的IP地址从连接列表里删除,并让客户端重新与集群中的节点建立连接,以恢复整个应用。
  • 机器掉电:需要等待电源接通之后重启机器。
    • 此时这个机器节点上的 RabbitMQ 处于 stop 状态,但是此时不要盲目重启服务,否则可能会引起网络分区
    • 此时需要在集群中的其他节点中执行 rabbitmqctl forget_cluster_node {nodename} 命令来将故障节点剔除。
    • 然后删除当前故障机器的 RabbitMQ 中的 Mnesia 数据(相当于重置)。
    • 然后再重启 RabbitMQ 服务。
    • 最后再将此节点作为一个新的节点加入到当前集群中。
  • 网络异常:网线松动或者网卡损坏都会引起网络故障的发生。
    • 对于网线松动,无论是彻底断开,还是“藕断丝连”,只要它不降速, RabbitMQ 集群就没有任何影响。但是为了保险起见,建议先关闭故障机器的 RabbitMQ 进程,然后对网线进行更换或者修复操作,之后再考虑是否重新开启 RabbitMQ 进程。
    • 网卡故障极易引起网络分区的发生,如果监控到网卡故障而网络分区尚未发生时,理应第一时间关闭此机器节点上的 RabbitMQ 进程,在网卡修复之前不建议再次开启。 如果己经发生了网络分区,可以参考 10.5 节进行手动恢复网络分区。
  • 服务进程异常:如 RabbitMQ 进程非预期终止,需要预先思考相关风险是否在可控范围之内。如果风险不可控,可以选择抛弃这个节点。一般情况下,重新启动 RabbitMQ 服务进程即可。

四. 集群迁移

扩容比较简单,一般向集群中加入新的集群节点即可,不过新的机器节点中是没有队列创建的,只有后面新创建的队列才有可能进入这个新的节点中。或者如果集群配置了镜像队列,可以通过系列操作将原先队列“漂移”到这个新的节点中,具体可以参考第 10.5 节。

迁移同样可以解决扩容的问题,将旧的集群中的数据(包括元数据信息和消息)迁移到新的且容量更大的集群中即可。 RabbitMQ 中的集群迁移更多的是用来解决集群故障不可短时间内修复而将所有的数据、客户端连接等迁移到新的集群中,以确保服务的可用性。相比于单点故障而言,集群故障的危害性就大得多,比如 IDC 整体停电、网线被挖断等。这时候就需要通过集群迁移重新建立起一个新的集群。

4.1 准备阶段-元数据重建

元数据重建是指在新的集群中创建原集群的队列交换器、绑定关系、 host、用户、权限 Parameter 等数据信息。元数据重建是集群迁移前的准备工作,之后才可将原集群中的消息及客户端连接迁移过来。

实现方式:

  • 手工创建:
  • 客户端创建:

元数据的整理十分繁琐,需要如Web管理工具的辅助:

可以在原集群上点击 Download broker definitions 按钮下载集群的元数据信息文件,此文件是 JSON 文件,比如叫 metadata.json 。

之后再在新集群上的 Web 管理界面中点击 Upload broker definitions 按钮上传 metadata.json 文件。

如果导入成功则会跳转到成功页面,这样就迅速在新集群中创建了元数据信息。如果新集群有数据与 metadata.json 中的数据相冲突,对于交换器、队列及绑定关系这类非可变对象而言会报错,而对于其他可变对象如 Parameter、用户等则会被覆盖,没有发生冲突的则不受影响。如果过程中发生错误,则导入过程终止,导致 metadata.json 中只有部分数据加载成功。

三个问题:

  1. 如果原集群突发故障,又或者开启 RabbitMQ Management 插件的那个节点机器故障不可修复,就无法获取原集群的元数据 metadata.json。

    • 这个问题很好解决,采取一个通用的备份任务,在元数据有变更或者达到某个存储周期时将最新的 metadata.json 备份至另一处安全的地方。这样在遇 到需要集群迁移时,可以获取到最新的元数据。
  2. 如果新旧集群的 RabbitMQ 版本不一致时会出现异常情况。一般情况下 RabbitMQ 是能够做到向下兼容的,在高版本的 RabbitMQ 中可以上传低版本的元数据文件。然而如果在低版本中上传高版本的元数据文件就没有那么顺利了。

    • 比如 3.5.7 版本与 3.6.10 版本的加密算法不一样,就会出现用户登录失败的情况,可以简单地在 Shell 控制台输入变更密码的方式来解决这个问题:rabbitmqctl change_password {username} {new_password}
    • 如果还是不能成功上传元数据,我们要先清楚对于用户、策略、权限这种元数据来说内容相对固定,且内容较少,手工重建的代价较小。集群中元数据最多且最复杂的要数队列、交换器和绑定这三项的内容,如果采用人工重建的方式代价太大,重建元数据的意义其实就在于重建队列、交换器及绑定这三项的相关信息。
    • 可以将 3.6.10 的元数据从 queues 这一项前面的内容,包括 rabbit_version、users、vhosts、permissions、parameters、global_parameters 和 policies 这几项内容复制后替换 3.5.7 版本中的 queues 这一项前面的所有内容,然后再保存。之后将修改并保存过后的 3.5.7 版本的元数据 JSON 文件上传到新集群 3.6.10 版本的 Web 管理界面中,至此就完成了集群的元数据重建。
  3. 第三个问题就是如果采用上面的方法将元数据在新集群上重建,则所有的队列都只会落到同一个集群节点上,而其他节点处于空置状态,这样所有的压力将会集中到这单台节点之上。

    两种解决方案,都是通过程序或脚本的形式在新集群上建立元数据:

    • 通过 HTTP API 接口创建相应的数据:

      • 引入gson:

        1
        2
        3
        4
        5
        6
        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.5</version>
        </dependency>
      • 三个Bean:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        public class Queue {
        private String name;
        private String vhost;
        private Boolean durable;
        private Boolean auto_delete;
        private Map<String, Object> arguments;
        }
        public class Exchange {
        private String name;
        private String vhost;
        private String type;
        private Boolean durable;
        private Boolean auto_delete;
        private Boolean internal;
        private Map<String, Object> arguments;
        }
        public class Binding {
        private String source;
        private String vhost;
        private String destination;
        private String destination_type;
        private String routing_key;
        private Map<String , Object> arguments;
        }
      • 解析原集群的 metadata.json 文件:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        public class JsonUtil {
        private static List<Queue> queueList = new ArrayList<>();
        private static List<Exchange> exchangeList = new ArrayList<>();
        private static List<Binding> bindingList = new ArrayList<>();

        private static void parseJson(String filename) {
        JsonParser parser = new JsonParser();
        try {
        JsonObject json = (JsonObject) parser.parse(new
        FileReader(filename));
        JsonArray jsonQueueArray = json.get("queues").getAsJsonArray();
        for (int i = 0; i < jsonQueueArray.size(); i++) {
        JsonObject subObject = jsonQueueArray.get(i).getAsJsonObject();
        Queue queue = parseQueue(subObject);
        queueList.add(queue);
        }
        JsonArray jsonExchangeArray =
        json.get("exchanges").getAsJsonArray();
        for (int i = 0; i < jsonExchangeArray.size(); i++) {
        JsonObject subObject = jsonExchangeArray.get(i).getAsJsonObject();
        Exchange exchange = parseExchange(subObject);
        exchangeList.add(exchange);
        }
        JsonArray jsonBindingArray = json.get("bindings").getAsJsonArray();
        for (int i = 0; i < jsonBindingArray.size(); i++) {
        JsonObject subObject = jsonBindingArray.get(i).getAsJsonObject();
        Binding binding = parseBinding(subObject);
        bindingList.add(binding);
        }
        } catch (FileNotFoundException e) {
        e.printStackTrace();

        }
        }

        //解析队列信息
        private static Queue parseQueue(JsonObject subObject) {
        Queue queue = new Queue();
        queue.setName(subObject.get("name").getAsString());
        queue.setVhost(subObject.get("vhost").getAsString());
        queue.setDurable(subObject.get("durable").getAsBoolean());
        queue.setAuto_delete(subObject.get("auto_delete").getAsBoolean());
        JsonObject argsObject = subObject.get("arguments").getAsJsonObject();
        Map<String, Object> map = parseArguments(argsObject);
        queue.setArguments(map);
        return queue;
        }

        //解析交换器信息
        private static Exchange parseExchange(JsonObject subObject) {
        //省略,具体参考 parseQueue 方法进行推演
        Exchange exchange = new Exchange();
        exchange.setName(subObject.get("name").getAsString());
        exchange.setVhost(subObject.get("vhost").getAsString());
        exchange.setType(subObject.get("type").getAsString());
        exchange.setDurable(subObject.get("durable").getAsBoolean());
        exchange.setInternal(subObject.get("internal").getAsBoolean());
        exchange.setAuto_delete(subObject.get("auto_delete").getAsBoolean());
        JsonObject argsObject = subObject.get("arguments").getAsJsonObject();
        Map<String, Object> map = parseArguments(argsObject);
        exchange.setArguments(map);
        return exchange;
        }

        //解析绑定信息
        private static Binding parseBinding(JsonObject subObject) {
        Binding binding = new Binding();
        binding.setSource(subObject.get("source").getAsString());
        binding.setVhost(subObject.get("vhost").getAsString());
        binding.setDestination(subObject.get("destination").getAsString());
        binding.setDestination_type(subObject.get("destination_type").getAsString());
        binding.setRouting_key(subObject.get("routing_key").getAsString());
        JsonObject argsObject = subObject.get("arguments").getAsJsonObject();
        Map<String, Object> map = parseArguments(argsObject);
        binding.setArguments(map);
        return binding;
        }

        //解析参数 arguments 项内容
        private static Map<String, Object> parseArguments(JsonObject argsObject) {
        Map<String, Object> map = new HashMap<>();
        Set<Map.Entry<String, JsonElement>> entrySet = argsObject.entrySet();
        for (Map.Entry<String, JsonElement> mapEntry : entrySet) {
        map.put(mapEntry.getKey(), mapEntry.getValue());
        }
        return map;
        }
        }
      • 在解析完队列、交换器及绑定关系之后,只需要遍历 queueList、exchangeList、bindingList ,然后调用 HTTP API 创建相应的数据即可。随机挑选一个节点并明确指明了 node 节点这一参数来创建队列,如此便可解决集群内部队列分布不均匀的问题。当然首先需要确定新集群中节点名称的列表:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        92
        93
        94
        95
        96
        97
        98
        99
        100
        101
        102
        103
        104
        105
        106
        107
        108
        109
        110
        111
        112
        113
        114
        115
        116
        117
        118
        119
        120
        121
        122
        123
        124
        125
        126
        127
        128
        129
        130
        131
        132
        133
        134
        135
        136
        137
        138
        139
        140
               

        private static final String ip = "192.168.0.2";
        private static final String username = "root ";
        private static final String password = "rootl23";
        private static final List<String> nodeList =new ArrayList<String>() {{
        add("rabbit@node1");
        add("rabbit@node2");
        add("rabbit@node3");
        }};

        //创建队列
        private static Boolean createQueues() {
        try {
        for (int i = 0; i < queueList.size(); i++) {
        Queue queue = queueList.get(i);
        //注意将特殊字符转义, 比如默认的 vhost , 将其转成 %2F
        String url = String.format("http://%s:l5672/api/queues/%s/%s", ip,
        encode(queue.getVhost(), "UTF-8"),
        encode(queue.getName(), "UTF-8"));
        Map<String, Object> map = new HashMap<>();
        map.put("auto_delete", queue.getAuto_delete());
        map.put("durable", queue.getDurable());
        map.put("arguments", queue.getArguments());
        //随机挑选一个节点, 并在此节点上创建相应的队列
        // int index = (int) (Math.random() * nodeList.size());
        // map.put("node", nodeList.get(index));
        Collections.shuffle(nodeList);
        map.put("node", nodeList.get(0));
        String data = new Gson().toJson(map);
        System.out.println(url);
        System.out.println(data);
        httpPut(url, data, username, password);
        }
        } catch(IOException e){
        e.printStackTrace();
        return false;
        }
        return true;
        }

        //创建交换器
        private static Boolean createExchanges() {
        try {
        for (int i = 0; i < exchangeList.size(); i++) {
        Exchange exchange = exchangeList.get(i);
        //注意将特殊字符转义, 比如默认的 vhost , 将其转成 %2F
        String url = String.format("http://%s:l5672/api/exchanges/%s/%s", ip,
        encode(exchange.getVhost(), "UTF-8"),
        encode(exchange.getName(), "UTF-8"));
        Map<String, Object> map = new HashMap<>();
        map.put("auto_delete", exchange.getAuto_delete());
        map.put("durable", exchange.getDurable());
        map.put("type", exchange.getType());
        map.put("internal", exchange.getInternal());
        map.put("arguments", exchange.getArguments());
        //随机挑选一个节点, 并在此节点上创建相应的队列
        // int index = (int) (Math.random() * nodeList.size());
        // map.put("node", nodeList.get(index));
        Collections.shuffle(nodeList);
        map.put("node", nodeList.get(0));
        String data = new Gson().toJson(map);
        System.out.println(url);
        System.out.println(data);
        httpPut(url, data, username, password);
        }
        } catch(IOException e){
        e.printStackTrace();
        return false;
        }
        return true;
        }

        //创建绑定关系
        private static Boolean createBindings() {
        //省略, 具体参考 createQueues 方法进行推演, 关键信息如 url
        try {
        for (int i = 0; i < bindingList.size(); i++) {
        Binding binding = bindingList.get(i);
        //注意将特殊字符转义, 比如默认的 vhost , 将其转成 %2F
        String url = null;
        //绑定有两种 交换器与队列, 交换器与交换器
        if (binding.getDestination_type().equals("queue") ){
        url = String.format("http://%s:l5672/api//bindings/%s/e/%s/q/%s", ip,
        encode(binding.getVhost(), "UTF-8"),
        encode(binding.getSource(), "UTF-8"),
        encode(binding.getDestination(), "UTF-8"));
        } else {
        url = String.format("http://%s:l5672/api//bindings/%s/e/%s/e/%s", ip,
        encode(binding.getVhost(), "UTF-8"),
        encode(binding.getSource (), "UTF-8"),
        encode(binding.getDestination(), "UTF-8"));
        }
        Map<String, Object> map = new HashMap<>();
        map.put("destination", binding.getDestination());
        map.put("destination_type", binding.getDestination_type());
        map.put("routing_key", binding.getRouting_key());
        map.put("arguments", binding.getArguments());
        //随机挑选一个节点, 并在此节点上创建相应的队列
        // int index = (int) (Math.random() * nodeList.size());
        // map.put("node", nodeList.get(index));
        Collections.shuffle(nodeList);
        map.put("node", nodeList.get(0));
        String data = new Gson().toJson(map);
        System.out.println(url);
        System.out.println(data);
        httpPut(url, data, username, password);
        }
        } catch(IOException e){
        e.printStackTrace();
        return false;
        }
        return true;
        }

        // http post
        public static int httpPut(String url, String data, String username, String password) throws IOException {
        HttpClient client = new HttpClient();
        client.getState().setCredentials(AuthScope.ANY,
        new UsernamePasswordCredentials(username, password));
        PutMethod putMethod = new PutMethod(url);
        putMethod.setRequestHeader("Content-Type", "application/json;charset=UTF-8");
        putMethod.setRequestEntity(new StringRequestEntity(data, "application/json", "UTF-8"));
        int statusCode = client.executeMethod(putMethod);
        //System.out.println(statusCode);
        return statusCode;
        }

        public static int httpPost(String url, String data, String username, String password) throws IOException {
        //省略, 具体参考 httpPut 方法进行推演
        HttpClient client = new HttpClient();
        client.getState().setCredentials(AuthScope.ANY,
        new UsernamePasswordCredentials(username, password));
        PostMethod postMethod = new PostMethod(url);
        postMethod.setRequestHeader("Content-Type", "application/json;charset=UTF-8");
        postMethod.setRequestEntity(new StringRequestEntity(data, "application/json", "UTF-8"));
        int statusCode = client.executeMethod(postMethod);
        //System.out.println(statusCode);
        return statusCode;
        }

        通过使用 Gson 解析 metadata.json 文件,进而使用 HttpClient 调用相应的 HTTP API 在随机的节点上创建相应的队列进程,从而达到了集群节点负载均衡的目的。

      • HttpClient需要引入依赖:

        1
        2
        3
        4
        5
        6
        <!-- https://mvnrepository.com/artifact/commons-httpclient/commons-httpclient -->
        <dependency>
        <groupId>commons-httpclient</groupId>
        <artifactId>commons-httpclient</artifactId>
        <version>3.1</version>
        </dependency>
    • 通过随机连接集群中不同的节点的IP地址,然后再创建队列。与前一种方式需要节点名称的列表不同,这里需要的是节点IP地址列表:

      • ```java
        private static List ipList = new ArrayList()/{/{
        add("192.168.0.2");
        add("192.168.0.3");
        add("192.168.0.4"); 
        
        /}/};
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56

        * 客户端通过连接不同的IP地址来创建不同的 connection 和 channel ,然后将 channel 存入一个缓冲池,之后从 channelList 中获取一个 channel ,再根据 queueList 中的信息创建相应的队列。

        ![](https://pic-1258215793.cos.ap-shanghai.myqcloud.com/content/20201001/202010010119.png)

        * 每一个 channel 对应一个 connection,而每一个 connection 又对应一个 IP ,这样串起来就能保证 channelList 中不会遗留任何节点,最终实现与第一种方式相同的功能。对应的队列创建代码:

        ```java
        private static void createQueuesNew() {
        List<Channel> channelList =new ArrayList<>();
        List<Connection> connectionList =new ArrayList<>();
        try {
        for (int i = 0;i < ipList.size();i++) {
        String ip = ipList.get(i);
        ConnectionFactory connectionFactory =new ConnectionFactory();
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setHost(ip);
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channelList.add(channel) ;
        connectionList.add(connection);
        }
        createQueueByChannel(channelList);
        } catch (IOException e) {
        e.printStackTrace() ;
        } catch (TimeoutException e) {
        e.printStackTrace();
        } finally {
        for (Connection connection : connectionList) {
        try {
        connection.close();
        } catch (IOException e) {
        e.printStackTrace();
        }
        }
        }
        }

        private static void createQueueByChannel(List<Channel> channelList) {
        for (int i = 0; i < queueList.size(); i++) {
        Queue queue = queueList.get(i);
        //随机获取相应的 channel
        Collections.shuffle(channelList);
        Channel channel = channelList.get(0);
        try {
        Map<String, Object> mapArgs = queue.getArguments();
        //do something with mapArgs.
        channel.queueDeclare(queue.getName(), queue.getDurable(),
        false, queue.getAuto_delete(), mapArgs);
        } catch (IOException e) {
        e.printStackTrace();
        }
        }
        }

4.2 数据迁移和客户端连接的切换

首先需要将生产者的客户端与原 RabbitMQ 集群的连接断开,然后再与新的集群建立新的连接,这样就可以将新的消息流转入到新的集群中。

之后就需要考虑消费者客户端的事情,一 种是等待原集群中的消息全部消费完之后再将连接断开,然后与新集群建立连接进行消费作业。

当原集群服务不可用或者出现故障造成服务质量下降而需要迅速将消息流切换到新的集群中时,此时就不能等待消费完原集群中的消息,这里需要及时将消费者客户端的连接切换到新的集群中,那么在原集群中就会残留部分未被消费的消息,此时需要做进一步的处理。如果原集群损坏,可以等待修复之后将数据迁移到新集群中,否则会丢失数据。

数据迁移的主要原理是先从原集群中将数据消费出来,然后存入一个缓存区中,另一个线程读取缓存区中的消息再发布到新的集群中,如此便完成了数据迁移的动作。

RabbitMQ提供的 Federation 和 Shovel 都可以实现 ForwardMaker 功能。

4.3 自动化迁移

要实现集群自动化迁移,需要在使用相关资源时就做好一些准备工作,方便在自动化迁移过程中进行无缝切换。

4.3.1 使用资源的3个部分

与生产者和消费者客户端相关的是交换器、队列及集群的信息,如果这种类型的资源发生改变时需要让客户端迅速感知,以便进行相应的处理,则可以通过将相应的资源加载到 ZooKeeper 的相应节点中,然后在客户端为对应的资源节点加入 watcher 来感知变化, 当然这个功能使用 etcd 或者集成到公司层面的资源配置中心中会更加标准、高效。

如图将整个 RabbitMQ 集群资源的使用分为3个部分:

  • 客户端、
  • 集群、
  • ZooKeeper 配置管理。

4.3.2 自动化迁移过程

在集群中创建元数据资源时都需要在 ZooKeeper 中生成相应的配置:

  • 比如在 cluster1 集群中创建交换器 exchange1 之后,需要在 /rmqNode/exchanges 路径下创建实节点 exchange1。并赋予节点的数据内容为:

    1
    2
    3
    4
    5
    cluster=cluster1 # 表示此交换器所在的集群名称
    exchangeType=direct # 表示此交换器的类型
    vhost=vhost1 # 表示此交换器所在的 vhost
    username=root # 表示用户名
    password=root123 # 表示密码
  • 在 cluster1 集群中创建队列 queue1 之后,需要在 /rmqNode/queues 路径下创建实节点 queue1 ,并赋予节点的数据内容为:

    1
    2
    3
    4
    5
    6
    cluster=cluster1
    bindings=exchange1 # 表示此队列所绑定的交换器
    # 如果有需要,也可以添加一些其他信息,比如路由键等
    vhost=vhost1
    username=root
    password=root123
  • 对应集群的数据在 /rmqNode/clusters 路径下,比如 cluster 集群,其对应节点的数据内容包含 IP 地址列表信息:

    1
    ipList=192.168.0.2, 192.168.0.3, 192.168.0.4 # 集群中各个节点的 IP 地址信息

客户端程序如果与其上的交换器或者队列进行交互,那么需要在相应的 ZooKeeper 节点中添加 watcher ,以便在数据发生变更时进行相应的变更,从而达到自动化迁移的目的。

生产者客户端

  • 在发送消息之前需要先连接 ZooKeeper,
  • 然后根据指定的交换器名称如 exchange1 找到相应的路径 /rmqNode/exchanges 中寻找 exchange1 的节点,
  • 之后再读取节点中的数据,井同时对此节点添加 watcher 。
  • 在节点的数据第一条 cluster=cluster1 中找到交换器所在的集群名称,
  • 然后再从路径 /rmqNode/clusters 中寻找 cluster1 节点,
  • 然后读取其对应 IP 地址列表信息。
  • 这样整个发送端所需要的连接串数据 (IP地址列表、vhost、usemame、password 等)都己获取,接下就可以与 RabbitMQ 集群 cluster 建立连接然后发送数据了。

对于消费者客户端而言:

  • 同样需要连接 ZooKeeper ,
  • 之后根据指定的队列名称(queue1) 到相应的路径 /rmqNode/queues 中寻找 queue1 节点,
  • 继而找到相应的连接串,
  • 然后与 RabbitMQ 集群 cluster1 建立连接进行消费。
  • 当然对 /rmqNode/queues/queue1 节点的 watcher 必不可少。

当 cluster1 集群需要迁移到 cluster2 集群时:

  • 首先需要将 cluster1 集群中的元数据在 cluster2 集群中重建。
  • 之后通过修改 channel 和 queue 元数据信息:
    • 比如原 cluster1 集群中有交换 exchange1、exchange2 和队列 queue1、queue2 ,
    • 现在通过脚本或者程序将其中的 cluster=cluster1 数据修改为 cluster=cluster2
    • 客户端会立刻感知节点的变化,然后迅速关闭当前连接之后再与新集群 cluster2 建立新的连接后生产和消费消息,在此切换客户端连接的过程中是可以保证数据零丢失的。
  • 迁移之后,生产者和消费者都会与 cluster2 集群进行互通,此时原 cluster 集群中可能还有未被消费完的数据,此时需要使用 RabbitMQ ForwardMaker 工具将 cluster1 集群中未被消费完的数据同步到 cluster2 集群中。

如果没有准备 RabbitMQ ForwardMaker 工具,也不想使用 Federation 或者 Shovel 插件,那么在变更完交换器相关的 ZooKeeper 中的节点数据之后,需要等待原集群中的所有队列都消费完全之后,再将队列相关的 ZooKeeper 中的节点数据变更,进而使得消费者的连接能够顺利迁移到新的集群之上。可以通过下面的命令来查看是否有队列中的消息未被消费完:

rabbitmqctl list_queues -p / -q | awk '{if($2>0) print $0 }'

4.3.3 空闲备份集群解决方案

上面的自动化迁移立足于将现有集群迁移到空闲的备份集群,如果由于原集群硬件升级等原因迁移也无可厚非。很多情况下,自动化迁移作为容灾手段中的一种,如果有很多个正在运行的 RabbitMQ 集群,为每个集群都配备一个空闲的备份集群无疑是一种资源的浪费。当然可以采取几个集群共用一个备份集群来减少这种浪费,那么有没有更优的解决方案呢?

就以4个 RabbitMQ 集群为例,其被分配4个独立的业务使用。如图 7-8 所示, 当 cluster1 集群中的元数据备份到 cluster2 集群中,而 cluster2 集群中的元数据备份到 cluster3 集群中,如此可以两两互备。比如在 cluster1 集群中创建了一个交换器 exchange1 ,此时需要在 cluster2 集群中同样创建一个交换器 exchange1 。在正常情况下,使用的是 cluster1 集群中的 exchange1 ,而 exchange1 在 cluster2 集群中只是一份记录,并不消耗 cluster2 集群的任何性能。而当需要将 cluster1 迁移时,只需要将交换器及队列相对应的 ZooKeeper 节点数据项变更即可完成迁移的工作。如此既不用耗费额外的硬件资源,又不用再迁移的时候重新建立元数据信息。

为了更加稳妥起见,也可以准备一个空闲的备份集群以备后用。当 cluster1 集群需要迁移 cluster2 集群中时, cluster2 集群己经发生故障被关闭或者被迁移到 cluster3 集群中了,那么这个空闲的备份集群可以当作 Plan B 来增强整体服务的可靠性。如果既想不浪费多余的硬件资源又想具备更加稳妥的措施,可以参考图 7-9 ,将 cluster1 中的元数据备份到 cluster2、cluster3 中,这样以1备2的方式即可解决这个难题。

对于上面介绍的多集群间互备的解决方案需要配套一个完备的实施系统,比如具备资源管理、执行下发、数据校对等功能,并且对于 ZooKeepe 节点中的数据项设计也需要细细斟酌,最好能够根据实际使用情况将这些整合到一个大的平台化系统之中。

五. 集群监控

监控不仅可以提供运行时的数据为应用提供依据参考,还可以迅速定位问题、提供预防及告警等功能,很大程度上增强了整体服务的鲁棒性。

RabbitMQ Management 插件就能提供一定的监控功能:如发送速度、确认速度、消费速度、消息总数、磁盘读写速度、句柄数、Socket 连接数、 Connection 数、 Channel 数、内存信息等。但是有一个遗憾就是其难以和公司内部系统平台关联,对于业务资源的使用情况、相应的预防及告警的联动无法顺利贯通。如果在人力、物力等条件允许的情况下,自定义一套监控系统非常有必要。

5.1 通过HTTP API接口提供监控数据

RabbitMQ Management 插件提供了HTTP API接口来提供监控数据。

假设集群中一共有4个节点:nod1、node2、node3、node4 。有一个交换器 exchange 通过同一个路由键“rk”绑定了3个队列 quue1、queue2 和 queue3。

首先可以通过 /api/nodes 接口来收集集群节点的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ClusterNode {
private long diskFree;//磁盘空闲
private long diskFreeLimit;
private long fdUsed;//句柄使用数
private long fdTotal;
private long socketsUsed;//Socket 使用数
private long socketsTotal;
private long memoryUsed;//内存使用值
private long memoryLimit;
private long procUsed;//Erlang 进程使用数
private long procTotal;

@Override
public String toString () {
return "{disk_free=" + diskFree + ", " +
"disk_free_limit=" + diskFreeLimit + ", " +
"fd_used= " + fdUsed + ", " +
"fd_total = " + fdTotal + ", " +
"sockets_used= " + socketsUsed + ", " +
"sockets_total= " + socketsTotal + ", " +
"mem_used= " + memoryUsed + ", " +
"mem_limit= " + memoryLimit + ", " +
"proc_used=" + procUsed + ", " +
"proc_total=" + procTotal + "}";
}
......
}

封装一下HTTP GET,方便后续程序直接调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HttpUtils {
public static String httpGet(String url, String username, String password)
throws IOException {
HttpClient client = new HttpClient();
client.getState().setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
GetMethod getMethod = new GetMethod(url) ;
int ret = client.executeMethod(getMethod);
String data = getMethod.getResponseBodyAsString();
System.out.println(data);
return data;
}
}

通过 HTTP GET 方法获取 http://xx.xxx.xxx.xxx:15672/api/nodes 的 JSON 数据,然后通过 GSON 进行解析, 之后即可采集到感兴趣的数据项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static List<ClusterNode> getClusterData(String ip, int port,
String username, String password) {
List<ClusterNode> list = new ArrayList<>();
String url = "http://" + ip + ":" + port + "/api/nodes";
System.out.println(url);
try {
String urlData = HttpUtils.httpGet(url, username, password);
parseClusters(urlData, list);
} catch(IOException e) {
e.printStackTrace();
}
System.out.println(list);
return list;
}


private static void parseClusters(String urlData, List<ClusterNode> list) {
JsonParser parser= new JsonParser();
JsonArray jsonArray =(JsonArray) parser.parse(urlData);
for(int i= 0 ;i < jsonArray.size(); i++) {
JsonObject jsonObjectTemp = jsonArray.get(i).getAsJsonObject();
ClusterNode cluster = new ClusterNode();
cluster.setDiskFree(jsonObjectTemp.get("disk_free").getAsLong());
cluster.setDiskFreeLimit(jsonObjectTemp.get(" disk_free_limit").
getAsLong());
cluster.setFdUsed(jsonObjectTemp.get("fd_used").getAsLong());
cluster.setFdTotal(jsonObjectTemp.get("fd_total").getAsLong() );
cluster.setSocketsUsed(jsonObjectTemp.get("sockets_used").getAsLong());
cluster.setSocketsTotal(jsonObjectTemp.get("sockets_total").getAsLong());
cluster.setMemoryUsed(jsonObjectTemp.get("mem_used").getAsLong() );
cluster.setMemoryLimit(jsonObjectTemp.get("mem_limit").getAsLong() );
cluster.setProcUsed(jsonObjectTemp.get("proc_used").getAsLong());
cluster.setProcTotal(jsonObjectTemp.get(" proc_total").getAsLong() );
list.add(cluster);
}
}

数据来集完之后并没有结束,图 7-10 中简单囊括了从数据采集到用户使用的过程:

  • 首先采集程序通过定时调用 HTTP API 接口获取 JSON 数据,
  • 然后进行 JSON 解析之后再进行持久化处理。
  • 对于这种基于时间序列的数据非常适合使用 OpenTSDB(基于 Hbase 的分布式的、可伸缩的时间序列数据库。主要用途就是做监控系统,比如收集大规模集群,包括网络设备、操作系统、应用程序的监控数据并进行存储、查询)来进行存储。
  • 监控管理系统可以根据用户的检索条件来从 OpenTSDB 获取相应的数据并展示到页面之中。
  • 监控管理系统本身还可以具备报表、权限管理等功能,同时也可以实时读取所采集的数据,对其进行分析处理,对于异常的数据需要及时报告给相应的人员。

对于集群的各节点信息展示可以参考下方,图 7-11 展示了各个节点实时的内存占用情况,图 7-12 展示了各个节点实时的磁盘使用情况。

对于交换器而言的数据采集可以调用 /api/exchanges/vhost/name 接口,比如需要调用虚拟主机为默认的 / 、交换器名称为 exchange 的数据,只需要使用 HTTP GET 方法获取 http://xxx.xxx.xxx.xxx:15672/api/exchanges/%2F/exchange 的数据即可。注意,这里需要将 / 进行 HTML 转义成 %2F ,否则会出错。对应的数据内容可以参考下方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"message_stats":{
"publish_in_details": {
"rate": 0.4 //数据流入的速率
},
"publish_in": 9,//数据流入的总量
"publish_out_details": {
"rate": 1.2 //数据流出的速率
},
"publish_out": 27//数据流出的总量
},
"outgoing": [],
"incoming": [],
"arguments": {},
"internal": false,
"auto_delete": false,
"durable": true,
"type": "direct",
"vhost": "/",
"name": "exchange"
}

对于1个交换器绑定3个队列的情况,向交换器发送1条消息,那么流入就是1条,而流出就是3条。在应用的时候根据实际情况挑选数据流入速率或者数据流出速率作为发送数量, 以及挑选数据流入的量还是数据流出的量作为发送量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class JQExchange {
private double publishInRate;
private long publishIn;
private double publishOutRate;
private long publishOut;
@Override
public String toString() {
return "{publish_in_rate=" + publishInRate +
", publish_in" + publishIn +
", publish_out_rate=" + publishOutRate +
", publish_out=" + publishOut + "}";
}
......
}
public class ExchangeMonitor {
public static void main(String[] args) {
try {
getExchangeData("192.168.0.2", 15672, "root", "rootl23", "/", "exchange");
} catch (IOException e) {
e.printStackTrace();
}
}

public static JQExchange getExchangeData(String ip, int port, String username,
String password, String vhost , String exchange) throws IOException {
String url = "http://" + ip + ":" + port + "/api/exchanges"
+ encode(vhost, "UTF-8") + "/" + encode(exchange, "UTF-8");
System.out.println(url);
String urlData = HttpUtils.httpGet(url, username, password);
System.out.println(urlData);
JQExchange exchangeAns = parseExchange(urlData);
System.out.println(exchangeAns);
return exchangeAns;
}

private static JQExchange parseExchange(String urlData) {//解析程序
JQExchange exchange = new JQExchange();
JsonParser parser = new JsonParser();
JsonObject jsonObject = (JsonObject) parser.parse(urlData);
JsonObject msgStats =
jsonObject.get("message_stats").getAsJsonObject();
double publish_in_details_rate =
msgStats.get("publish_in_details")
.getAsJsonObject().get("rate").getAsDouble();
double publish_out_details_rate =
msgStats.get("publish_out_details").
getAsJsonObject().get("rate").getAsDouble();
long publish_in = msgStats.get("publish_in").getAsLong();
long publish_out = msgStats.get("publish_out").getAsLong();
exchange.setPublishInRate(publish_in_details_rate);
exchange.setPublishOutRate(publish_out_details_rate);
exchange.setPublishIn(publish_in);
exchange.setPublishOut(publish_out);
return exchange;
}
}

对于队列而言的数据来集相关的接口为 /api/queues/vhost/name 。

5.2 通过客户端提供监控数据

除了 HTTP API 接口可以提供监控数据,Java 版客户端从 3.6.x 版本开始,也在 Channel 接口中提供了两个方法来获取数据:

1
2
3
long messageCount(String var1) throws IOException;

long consumerCount(String var1) throws IOException;
  • messageCount:用来查询队列中的消息个数,可以为监控消息堆积的情况提供数据。
  • consumerCount:用来查询队列中的消费者个数,可以为监控消费者的情况提供数据。

相应监控视图:

还可以通过连接的状态进行监控,Connection 接口提供:

1
2
3
4
5
6
7
void addBlockedListener(BlockedListener var1);

BlockedListener addBlockedListener(BlockedCallback var1, UnblockedCallback var2);

boolean removeBlockedListener(BlockedListener var1);

void clearBlockedListeners();
  • addBlockedListener:用来监昕连接阻塞信息。
  • addShutdownListener:用来监昕连接关闭信息。

用户客户端还可以自行定义一些数据进行埋点 ,比如客户端成功发送的消息个数和发送失败的消息个数,进一步可以计算发送消息的成功率等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static volatile int successCount = 0; //记录发送成功的次数
public static volatile int failureCount = 0; //记录发送失败的次数

public void XXXXXX(Channel channel) {
try {
channel.confirmSelect();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
failureCount++;
}
});
channel.basicPublish("", "", true,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"msg" .getBytes());
if (channel.waitForConfirms() == true) {
successCount++;
} else {
failureCount++;
}
} catch (IOException e) {
e.printStackTrace();
failureCount++;
} catch (InterruptedException e){
e.printStackTrace();
failureCount++;
}
}

里推荐引入 metrics 工具(比如 com.codahale.metrics.*)来进行埋点,这样既方便又高效。同样的方式也可以统计消费者消费成功的条数和消费失败的条数。

5.3 检测RabbitMQ服务是否健康

上述两种方式都要基于 RabbitMQ 服务运行正常的情况下,但无法判断 RabbitMQ 是否具备服务外部请求的能力。

三种检查方式:

  • 检查RabbitMQ是否运行:ps aux | grep rabbitmq
  • 检查5672端口是否开启:telnet xxx.xxx.xxx.xxx 5672
  • 使用 AMQP 协议来构建一个类似于 TCP 协议中的 Ping 的检测程序。当这个测试程序与 RabbitMQ 服务无法建立 TCP 协议层面的连接,或者无法构建 AMQP 协议层面的连接,再或者构建连接超时时,则可判定 RabbitMQ 服务处于异常状态而无法正常为外部应用提供相应的服务。

AMQPPing实现代码可以参考《RabbitMQ实战指南 7.5.3》。

RabbitMQ Management 插件提供了 /api/aliveness-test/vhost 的 HTTP API 形式的接口,通过3个步骤来验证 RabbitMQ 服务的健康性:

  • 创建1个以 aliveness-test 为名称的队列来接收测试消息。
  • 用队列名称 aliveness-test 作为消息的路由键,将消息发往默认交换器。
  • 到达队列时就消费该消息,否则就报错。

检测程序 aliveness-test 运行在 Erlang 虚拟机内部, 因此它不会受到网络问题的影响。

5.4 元数据管理与监控

确保 RabbitMQ 能够健康运行还不足以让人放松警惕。比如在生产环境误删了一个队列(或者删除交换器、修改绑定信息等),若业务方正在使用这个队列,返回了异常后即使处理还能尽量的减少影响。但如果是深夜执行的定时任务,处理起来就很麻烦了。

许多应用场景是在业务逻辑代码中创建相应的元数据资源(交换器、队列及绑定关系)并使用对于排他的、自动删除的这类非高可靠性要求的元数据资源可以在一定程度上忽略元数据变更的影响。但是对于两个非常重要的且通过消息中间件交互的业务应用,在使用相应的元数据资源时最好进行相应的管控,如果一方或者其他方肆意变更所使用的元数据,必然对另一方造成不小的损失。管控的介入自然会降低消息中间件的灵活度,但是可以增强系统的可靠性。 比如通过专用的“元数据审核系统”来配置相应的元数据资源,提供给业务方使用的用户只有 可读和可写的权限,这样可以进一步降低风险。

RabbitMQ 在创建元数据资源的时候是以一种声明的形式完成的:无则创建、有则不变,不过在对应的元数据存在的情况下,对其再次声明时使用不同的属性会报出相应的错误信息。 我们可以利用这一特性来监控元数据的变更,通过定时程序来将记录中的元数据信息重新声明一次,查看是否有异常报出。不过这种方法非常具有局限性,只能增加元数据的信息而不能减少。比如有一个队列没有消费者且以后也不会被使用,我们对其进行了解绑操作,这样就没有更多的消息流入而造成消息堆积,不过这一变更由于某些局限性没有及时将记录变更以通知到那个定时程序,此时又重新将此队列绑定到原交换器中。

如图 7-15 所示,所有的业务应用都需要通过元数据审核系统来申请创建(当然也可以包含查询、修改及删除)相应的元数据信息。在申请动作完成之后,由专门的人员进行审批,之后在数据库中存储和在 RabbitMQ 集群中创建相应的元数据,这两个步骤可以同时进行,而且也无须为这两个动作添加强一致性的事务逻辑。在数据库和 RabbitMQ 集群之间会有一个元数据一致性校验程序来检测元数据不一致的地方,然后将不一致的数据上送到监控管理系统。监控管理系统中可以显示元数据不一致的记录信息,也可以以告警的形式推送出来,然后相应的管 理人员可以选择手动或者自动地进行元数据修正。这里的不一致有可能是由于数据库的记录未被正确及时地更新,有可能是 RabbitMQ 集群中元数据被异常篡改 元数据修正需慎之又慎,在整个系统修正逻辑完备之前,建议优先采用人工的方式,毕竟不一致的元数据仅占少数,人工修正的工作量并不太大。

主要的元数据是:queues、exchanges、bindings ,可以分别建立三张表。元数据一致性检测程序可以通过 /api/definitions 的 HTTP API 接口获取集群的元数据信息,通过解析之后与数据库中的记录一一比对,查看是否有不一致的地方。


参考:

🔗 《RabbitMQ实战指南》