RabbitMQ(七)Federation与Shovel

RabbitMQ(七)集群

一. Federation

Federation 插件的设计目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建立集群

适用场景下:

  • Federation 插件能够在不同管理域(可能设置了不同的用户和 vhost ,也可能运行在不同版本的 RabbitMQ Erlang 上)中的 Broker 或者集群之间传递消息。
  • Federation 插件基于 AMQP 0-9-1 协议在不同的 Broker 之间进行通信,并设计成能够容忍不稳定的网络连接情况。
  • 一个 Broker 节点中可以同时存在联邦交换器(或队列)或者本地交换器(或队列),只需要对特定的交换器(或队列)创建 Federation 连接(Federation link )
  • Federation 需要在 Broker 节点之间创建 O(N^2^)个连接(尽管这是最简单的使用方式),这也就意味着 Federation 在使用时更容易扩展。

Federation 插件可以让多个交换器或者多个队列进行联邦。一个联邦交换器(federated exchange)或者一个联邦队列(federated queue)接收上游(upstream)的消息,这里的上游是指位于其他 Broker 上的交换器或者队列

  • 联邦交换器能够将原本发送给上游交换器(upstream exchange)的消息路由到本地的某个队列中
  • 联邦队列则允许一个本地消费者接收到来自上游队列(upstream queue)的消息

1.1 联邦交换器

1.1.1 案例:Federation优化服务器通信网络延迟问题

例如有如下集群:broker1部署于北京,broker2部署于上海,broker3部署于广州,因为物理距离的问题,所以要考虑网络延迟问题。

有一个广州的业务ClientA需要连接 broker3 ,并向其中的交换器 exchangeA 发送消息,此时的网络延迟很小,ClientA 可以迅速将消息发送至 exchangeA 中,就算在开启了 publisher confirm 机制或者事务机制的情况下,也可以迅速收到确认信息。

此时又有一个在北京的业务 ClientB 需要向 exchangeA 发送消息,那么 ClientB 与 broker3 之间有很大的网络延迟, ClientB 将发送消息至 exchangeA 会经历一定的延迟,尤其是在开启了 publisher confirm 机制或者事务机制的情况下, ClientB 会等待很长的延迟时间来接收 broker3 的确认信息,进而必然造成这条 发送线程的性能降低,甚至造成一定程度上的阻塞。

那么要怎么优化业务 ClientB 呢?将业务 ClientB 部署到广州的机房中可以解决这个问题,但是如果 ClientB 调用的另一些服务都部署在北京,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现?这里使用 Federation 插件就可以很好地解决这个问题

如图 8-2 所示,在广州的 broker3 中为交换器 exchangeA(broker3 中的队列 queueA 通过 rkA 与 exchangeA 进行了绑定)与北京的 broker1 之间建立一条单向的 Federation link :

  • 此时 Federation 插件会在 broker1 上建立一个同名的交换器 exchangeA(这个名称可以配置,默认同名),
  • 同时建立一个内部的交换器 exchangeA->broker3 B ,并通过路由键 rkA 将这两个交换器绑定起来。
  • 这个交换器 exchangeA->broker3 B 名字中的 broker3 是集群名,可以通过 rabbitmqctl set_cluster_name {new name} 命令进行修改。
  • 与此同时 Federation 插件还会在 broker1 上建立一个队列 federation: exchangeA->broker3 B 并与交换器 exchangeA->broker3 B 进行绑定。
  • Federation 插件会在队列 federation: exchangeA->broker3 B 与 broker3 中的交换器 exchangeA 之间建立一条 AMQP 连接来实时地消费队列 federation: exchangeA->broker3 B 中的数据。
  • 这些操作都是内部的,对外部业务客户端来说这条 Federation link 建立在 broker1 的 exchangeA 和broker3 的 exchangeA 之间。

因此部署在北京的业务 ClientB 可以连接 broker1(北京)并向 exchangeA 发送消息,这样 ClientB 可以迅速发送完消息并收到确认信息,而之后消息通过 Federation link 转发到 broker3(广州)的交换器 exchangeA 中。最终消息会存入与 exchangeA 绑定的队列 queueA 中,消费者最终可以消费队列 queueA 中的消息。

经过 Federation link 转发的消息会带有特殊的 headers 性标记。例如向 broker1 中的交换器 exchangeA 发送一条内容为“federation test payload ”的持久化消息,之后可以在 broker3 中的队列 queueA 消费到这条消息:

1.1.2 联邦交换器和普通无明显差异

Federation 不仅便利于消息生产方,同样也便利于消息消费方。假设某生产者将消息存入 broker1 中的某个队列 queueB,在广州的业务 ClientC 想要消费 queueB 消息,消息的流转及确认必然要忍受较大的网络延迟,内部编码逻辑也会因这一因素变得更加复杂,这样不利于 ClientC 的发展。

不如将这个消息转发的过程以及内部复杂的编程逻辑交给 Federation 去完成, 而业务方在编码时不必再考虑网络延迟的问题。 Federation 使得生产者和消费者可以异地部署而又让这两方感受不到过多的差异(类似RPC的特性?)。

上个案例的队列 federation: exchangeA->broker3 B 是一个相对普通的队列,可以直接通过客户端进行消费。假设此时还有一个客户端 ClientD 通过 Basic.Consume 来消费队列 federation: exchangeA->broker3 B 的消息,会导致发往 broker1 的 exchangeA 中的消息会有一半被 ClientD 消费掉,另一半会发往 broker3 的 exchangeA 。如果业务方要求所有发往 broker1 的 exchangeA 中的消息都要转发到 broker3 的 exchangeA 中,要注意不要使队列 federation: exchangeA->broker3 B 有其他消费者

但也可以注意到,队列 federation: exchangeA->broker3 B 本身就是普通的队列,天然支持“异地均摊消费”类似的需求。一个 federated exchange 同样可以成为另一个交换器的 upstream exchange 。

两方的交换器可以互为 federated exchange 和 upstream exchange 其中 max_hops=1 表示一条消息最多被转发的次数为1。

注意:对于默认的交换器(每个 vhost 下都会默认创建一个名为“”的交换 器)和内部交换器而言,不能对其使用 Federation 功能

1.1.3 联邦交换器组成复杂的拓扑逻辑部署方式

“fan-out”的多叉树形式:

“三足鼎立”形式:

环形的拓扑部署:

1.2 联邦队列

联邦队列(federated queue):可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

图 8-9 演示了两个Broker中的联邦队列(灰色)和非联邦队列(白色)部署:queue1 和 queue2 位于 broker2,配置为联邦队列并设置 broker1 为上游, Federation 插件会在 broker1 创建两个同名队列,并且分别和 broker2 的队列建立单向独立的 Federation link 。

当有消费者 ClientA 连接 broker2 并通过 Basic.Consume 消费队列 queue1 (或 queue2 )中的消息时:

  • 如果此时队列已有消息堆积就可以直接被消费,且 broker2 的队列不会拉取 broker1 的队列中的消息;
  • 如果此时队列没有消息,会通过 Federation link 拉取在 broker1 的上游队列中的消息,然后存储到本地,再被 ClientA 消费。

既可以消费联邦队列,又可以消费上游队列,这种分布式队列的部署可以提高单个队列的容量。如果上游一端部署的消费者来不及消费上游队列的消息,下游的消费者可以帮其分担消费,有一定的负载均衡的效果。

与联邦交换器不同的是,一条消息可以在联邦队列间转发无限次,因为队列可以互为联邦队列:消息会转向有多余消费能力的一方,所以可能会导致消费在队列间来回转发。

如下图,broker2 的队列 queue 没有消息堆积或者消息被消费完之后并不能通过 Basic.Get 来获取 broker1 中队列 queue 消息。因为 Basic.Get 是一个异步的方法,如果要从 broker1 中队列 queue 拉取消息,必须要阻塞等待通过 Federation link 拉取消息存入 broker2 中的队列 queue 之后再消费消息,所以对于 federated queue 而言只能使用 Basic.Consume 进行消费

联邦队列不具备传递性:队列 queue2 作为联邦队列与 queue1 进行联邦,而队列 queue2 又作为队列 queue3 的上游队列,但是这样队列 queue1 和 queue3 之间并没有产生任何联邦的关系。如果队列 queue1 有消息堆积,消费者连接 broker3 消费 queue3 中的消息,无论 queue3 处于何种状态,这些消费者都消费不到 queue1 中的消息, 除非 queue2 有消费者。

1.3 Federation的使用

使用Federation前要配置两个内容:

  • 需要配置一个或多个 upstream,每个 upstream 均定义了到其他节点的 Federation link。这个配置可以通过设置运行时的参数(Runtime Parameter)来完成,也可以通过 federation management 插件来完成。
  • 需要定义匹配交换器或者队列的一种/多种策略(Policy)。

1.3.1 启用Federation

开启Federation功能:

1
2
$ rabbitmq-plugins enable rabbitmq_federation 
......

开启Federation的管理插件:

1
2
$ rabbitmq-plugins enable rabbitmq_federation_management
......

开启后在RabbitMQ的管理界面中【Admin】右侧会多出两个Tab页:

注意:当需要在集群中使用 Federation 功能的时候,集群中所有的节点都应该开启 Federation 插件

1.3.2 相关配置和工具

有关 Federation upstream 信息全部都保存在 RabbitMQ 的 Mnesia 数据库中,包括用户信息、权限信息、队列信息等。

在Federation中有三种级别的配置:

  • Upstreams:每个 upstream 用于定义与其他 Broker 建立连接的信息。 属于运行时参数。
  • Upstream sets:每个 upstream set 用于对一系列使用 Federation 功能的 upstream 行分组。 实际的简单使用场景可忽略,所有upstreambuild添加到一个名叫 all 且隐式定义的 upstream set 中。
  • Policies:每一个 Policy 会选定出一组交换器,或者队列,亦或者两者皆有而进行限定,进而作用于一个单独的 upsteam 或者 upstream set 之上。

Federation 相关的运行时参数和策略都可以通过3种方式进行设置:

  • 通过 rabbitmqctl 工具;
  • 通过 RabbitMQ Management 插件提供的 HTTP API 接口;
  • 通过 rabbitmq federation management 插件提供的 Web 管理界面的方式(最方便且通用)。不过基于 Web 管理界面的方式不能提供全部功能,比如无法针对 upstream set 进行管理。

1.3.3 使用流程

以图 8-2 中broker1和broker3为例描述如何建立联邦交换器

  • 第一步,在 broker1 和 broker3 中开启 rabbitmq_federation 插件,最好同时开启 rabbitmq_federation_management 插件。

  • 第二步,在 broker3 中定义一个 upstream,有三种设置方式:

    • 第一种是通过 rabbitmqctl 工具的方式:

      1
      2
      $ rabbitmqctl set_parameter federation-upstream f1
      uri '{"uri":"amqp://root:root123 192.168.0.2:5672","ack-mode":"on-confirm"}'
    • 第二种是通过调用 HTTP API 接口的方式:

      1
      2
      3
      $ curl -i -u root:rootl23 -XPUT -d
      '{"value":{"uri":"amqp://root:root123 192.168.0.2:5672","ack-mode":"on-confirm"}}'
      http://192.168.0.4:15672/api/parameters/federation-upstream/%2f/f1
    • 第三种是通过在Web管理界面中添加的方式,【Admin】->【Federation Upstreams】->【Add a new upstream】,各个参数的含义:

      • Name :定义这个 upstream 的名称,必填项。

      • URI (uri):定义 upstream 的 AMQP 连接,必填项。本示例中可以填写为 amqp://root:root123 192.168.0.2:5672

      • Prefetch count(prefetch count):定义 Federation 内部缓存的消息条数,即在收到上游消息之后且在发送到下游之前缓存的消息条数。

      • Reconnect delay(reconnect-delay): Federation link 由于某种原因断开之后, 需要等待多少秒开始重新建立连接。

      • Acknowledgement Mode(ack-mode):定义 Federation link 的消息确认方式。共有3种:

        • on-confirm:默认为 on-confirm,表示在接收到下游的确认消息(等待下游的 Basic.Ack )之后再向上游发送消息确认,这个选项 可以确保网络失败或者 Broker 密机时不会丢失消息,但也是处理速度最慢的选项。
        • on-publish:表示消息发送到下游后(并需要等待下游的 Basic.Ack ) 再向上游发送消息确认,这个选项可以确保在网络失败的情况下不会丢失消息,但不能确保 Broker 岩机时不会丢失消息。
        • no-acko:表示无须进行消息确认,这个选项处理速度最快,但也最容易丢失消息。
      • Trust User-ID(trust-user-id):设定 Federation 是否使用 Validated User-ID 这个功能。如果设置为 false 或者没有设置,那么 Federation 会忽略消息的 user_id 这个属性;如果设置为 true ,则 Federation 只会转发 user_id 为上游任意有效的用户的消息。

      • 所谓的 Validated User-ID 功能是指发送消息时验证消息的 user_id 的属性。channel.basicPublish 方法中有个参数是 BasicProperties 有一个属性 user_id :

        • 设置 user_id 属性为 root :

          1
          2
          3
          AMQP.BasicProperties properties = new AMQP.BasicProperties();
          properties.setUserid("root");
          channel.basicPublish("amq.fanout","", properties,"test user id".getBytes());
        • 如果在连接 Broker 时所用的用户名为 root,当发送 test user id 这条消息时设置的 user_id 的属性为 guest ,那么这条消息会发送失败,具体报错为 406 PRECONDITION_FAILED - user_id property set to 'guest' but authenticated user was 'root' ,只有当 user_id 设置为 root 时,这条消息才会发送成功。

  • 第三步,定义一个 Policy 用于匹配交换器 exchangeA ,并使用第二步中所创建的 upstream ,有三种设置方式:

    • 第一种是通过 rabbitmqctl 工具的方式:

      1
      2
      # 定义所有以exchange开头的交换器作为 federated exchange
      $ rabbitmqctl set_policy --apply-to exchanges p1 "^exchange" '{"federation upstream":"f1"}'
    • 第二种是通过调用 HTTP API 接口的方式:

      1
      2
      $ curl -i -u root:rootl23 -XPUT -d '{"pattern":"^exchange","definition":{"federation-upstream":"f1"},"apply-to":"exchanges"}' 
      http://192.168.0.4:15672/api/policies/%2F/p1
    • 第三种是通过在Web管理界面中添加的方式,【Admin】->【Federation Upstreams】->【Add/Update a policy】,创建一个 Federation link ,可以 Web 管理界面中【Admin】->【Federation Status】->【Running Links 】查看到相应的链接(或者通过指令 rabbitmqctl eval 'rabbit federation_status:status(). 查看相应的 Federation link)。

如何建立联邦队列

  • 首先同样也是定义一个 upstream 。

  • 之后定义 Policy 的时候略微有变化,比如使用 rabbitmqctl 工具的情况:

    1
    2
    # 定义所有以exchange开头的交换器作为 federated queue
    $ rabbitmqctl set_policy --apply-to queues p2 "^queue" '{"federation upstream":"f1"}'
  • 通常情况下,针对每个 upstream 会有一条 Federation link。此Federation link 对应到一个交换器上。例如, 3个交换器与2个upstream分别建立 Federation link 的情况下 ,会有6条连接。

只适用联邦交换器的参数:

  • Exchange (exchange):指定 upstream exchange 的名称,默认情况下和 federated exchange 同名,即图 8-2 中的 exchangeA 。
  • Max hops (max-hops):指定消息被丢弃前在 Federation link 中最大的跳转次数,默认为1。注意即使设置 max-hops 参数为大于1的值,同一条消息也不会在同一个 Broker 中出现2次,但是有可能会在多个节点中被复制。
  • Expires (expires):指定 Federation link 断开之后,federated queue 所对应的 upstream queue(即图 8-2 中的队列 federation: exchangeA->broker3 )的超时时间,默认为 none,表示为不删除,单位为 ms 。这个参数相当于设置普通队列的 x-expires 参数。设置这个值可以避免 Federation link 断开之后,生产者一直在向 broker1 中的 exchangeA 发送消息,这些消息又不能被转发到 broker3 中而被消费掉,进而造成 broker1 中有大量的消息堆积。
  • Message TTL (message-ttl):为 federated queue 所对应的 upstream queue (即 8-2 中的队列 federation: exchangeA->broker3 )设置,相当于普通队列的 x-message-ttl 参数,默认为 none 。表示消息没有超时时间。
  • HA policy (ha-policy):为 federated queue 所对应的 upstream queue (即 8-2 中的队列 federation: exchangeA->broker3 )设置,相当于普通队列的 x-ha-policy 参数,默认为 none,表示队列没有任何 HA。

只适用联邦队列的参数:

  • Queue (queue):执行 upstream queue 的名称,默认情况下和 federated queue 同名。

二. Shovel

2.1 简介

与 Federation 具备的数据转发功能类似, Shovel 能够可靠、持续地从一个 Broker 中的队列拉取数据并转发至另一个 Broker 中的交换器

作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker ,也可以位于不同的 Broker 上

Shovel 可以翻译为“铲子”,是一种比较形象的比喻,这个“铲子”可以将消息从一方“挖到”另一方。 Shovel 的行为就像优秀的客户端应用程序,能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。

2.1.1 优势

Shovel 的主要优势在于:

  • 松耦合: Shovel 可以移动位于不同管理域中的 Broker(或者集群)上的消息,这些 Broker(或者集群)可以包含不同的用户和 vhost ,也可以使用不同的 RabbitMQ Erlang 版本。
  • 支持广域网:Shovel 插件同样基于 AMQP 协议在 Broker 之间进行通信,被设计成可以容忍时断时续的连通情形,并且能够保证消息的可靠性。
  • 高度定制:当 Shovel 成功连接后,可以对其进行配置以执行相关的 AMQP 命令。

2.2 原理

Shovel 的结构示意图:两个Broker,broker1 中有交换器 exchange1 和队列 queue1 ,且这两者通过路由键 rk1 进行绑定;broker2 中有交换器 exchange2 和队列 queue2 ,且这两者通过路由键 rk2 进行绑定。

在队列 queue1 和交换器 exchange2 之间配置一个 Shovel link,当一条内容为 shovel test payload 的消息从客户端发送至交换器 exchange1 的时候,这条消息会经过图 8-15 中的数据流转最后存储在队列 queue2 中。

如果在配置 Shovel link 时设置了 add_forward_headers 参数为 true ,则在消费到队列 queue2 中这条消息的时候会有特殊的 headers 属性标记,详细内容可参考图 8-16 :

通常情况下,使用 Shovel 时配置队列作为源端,交换器作为目的端,但同样可以将队列配置为目的端:(虽然看起来像队列queue1直接通过 Shovel link 将消息转发给queue2,但其实还经过了broker2的默认交换器转发)

配置交换器为源端也可以,虽然看起来像交换器 exchange1 直接通过 Shovel link 将消息转发给 exchange2 ,但其实broker1会创建一个队列并绑定 exchange1 ,消息从 exchange1 过来后先存储在这个队列,然后Shovel再从这个队列中拉取消息并转发到exchange2。

Shovel 可以为源端或者目的端配置多个 Broker 的地址,这样可以使得源端或者目的端的 Broker 失效后能够尝试重连到其他 Broker 之上(随机挑选)。

可以设置 reconnect_delay 参数以避免由于重连行为导致的网络泛洪, 或者可以在重连失败后直接停止连接。针对源端和目的端的所有配置声明会在重连成功之后被重新发送。

2.3 使用

开启Shovel:

1
$ rabbitmq-plugins enable rabbitmq_shovel 

开启Shovel管理插件:

1
$ rabbitmq-plugins enable rabbitmq_shovel_management  

在 RabbitMQ 的管理界面中【Admin】的右侧会多出【Shovel Status 】和【Shovel Management】两个 Tab 页:

Shovel 既可以部署在源端,也可以部署在目的端。

有两种方式可以部署 Shovel:

  • 静态方式(static):指在 rabbitmq.config 配置文件中设置。
  • 动态方式(dynamic):指通过 Runtime Parameter 设置。

2.3.1 静态方式

rabbitmq.config 配置文件中针对 Shovel 插件的配置信息是一种 Erlang 项式,由单条 Shovel 条目构成:

1
{rabbitmq_shovel, [ {shovels, [ {shovel_name, [ ... ] }, ... ]} ]} 

每一条 Shovel 条目定义了源端与目的端的转发关系,其名称(Shovel name)必须是独一无二的:(其中 sources、destination 和 queue 三项是必需的,其余的都可以默认)

1
2
3
4
5
6
7
8
9
{shovel_name, [ {sources, [ ... ] }
, {destinations, [ ... ] }
, {queue, queue_name}
, {prefetch_count, count}
, {ack_mode, a_mode}
, {publish_properties, [ ... ] }
, {publish_fields , [ ... ] }
, {reconnect_delay, reconn_delay}
]}

详细配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
[{rabbitmq_shovel, 
[{shovels,
[{hidden shovel,
[{sources,
# broker项配置的是URI,定义了用于连接Shovel两端的服务器地址、用户名、密码、vhost和端口号等。
[{broker, "amqp://root:root123@192.168.0.2:5672 "},
# declarations这一项是可选的,declaration_list指定了可以使用的AMQP,声明了队列、交换器和绑定关系。
{declarations,
[
# 声明了队列,<<告诉Erlang是binary类型字符串
{'queue.declare', [{queue, <<"queue1">>}, durable]},
# 声明了交换器,durable需要赋值时才要加上{}
{'exchange.declare',[
{exchange, <<"exchange1">>},
{type, <<"direct">>},
durable
]
},
# 声明了绑定关系
{'queue.bind', [
{exchange, <<"exchange1">>},
{queue, <<"queue1">>},
{routing_key, <<"rk1">>}
}]}]},
{destinations,
[{broker,"amqp://root:root123@192.168.0.2:5672"},
{declarations,
[
{'queue.declare', [{queue, <<"queue2">>}, durable]},
{'exchange.declare',[
{exchange, <<"exchange2">>},
{type, <<"direct">>},
durable
]
},
{'queue.bind', [
{exchange, <<"exchange2">>},
{queue, <<"queue2">>},
{routing_key, <<"rk2">>}
}]}]},
# 表示源端服务器上的队列名称,<<>>则表示匿名队列
{queue, <<"queue1">>},
# 表示在完成转发消息时的确认模式
# no_ack表示无须任何消息确认行为
# no_publish表示Shovel会把每一条消息发送到目的端之后再向源端发送消息确认
# no_confirm表示Shovel会使用publisher confirm机制,在收到目的端的消息确认之后再向源端发送消息确认
{ack_mode, no_ack},
# 表示Shovel内部缓存的消息条数,Shovel的内部缓存是源端服务器和目的端服务器之间的中间缓存部分
{prefetch_count, 64},
# 指消息发往目的端时需要特别设置的属性列表
{publish_properties, [{delivery_mode, 2}]},
# 设置为true,会在转发的消息内添加x-shovelled的header属性
{add_forward_headers, true},
# 定义了消息需要发往目的端服务器上的交换器以及标记在消息上的路由键。如果交换器和路由键没有定义,则Shovel会从原始消息上复制这些被忽略的设置。
{publish_fields, [{exchange, <<"exchange2">>},
{routing_key, <<"rk2">>}]},
# 指定在Shovel link失效情况下,重新建立连接前需要等待的时间。如果设置为0,则不会进行重连动作,即Shovel会在首次连接失效时停止工作。默认为5秒。
{reconnect_delay, 5} J
}]
}]
}].

如果 sources 或者 destinations 是 RabbitMQ 集群,那么就使用 brokers ,并在其后用多个URI字符串以 [] 的形式包裹起来。

2.3.2 动态方式

Federation upstream 类似 Shovel 动态部署方式的配置信息会被保存 RabbitMQ Mnesia 数据库中,包括权限信息、用户信息和队列信息等内容。

每一个Shovel link都由一个相应的Parameter定义,Parameter设置方式:

  • 第一种,通过 rabbitmqctl 工具的方式:

    1
    2
    3
    4
    5
    6
    $ rabbitmqctl set_parameter shovel hidden_shovel \
    '{"src-uri":"amqp://root:root123@192.168.0.2:5672",
    "src-queue":"queue1",
    "dest-uri":"amqp://root:rootl23@192.168.0.3:5672","src- exchange-key":"rk2",
    "prefetch-count":64,"reconnect-delay":5,"publish-properties":[],
    "add-forward-headers":true,"ack-mode":"on-confirm"}'
  • 第二种,通过调用 HTTP API 接口的方式:

    1
    2
    3
    4
    5
    6
    $ curl -i -u root:rootl23 -XPUT d
    '{"value":{"src-uri":"amqp://root:root123@192.168.0.2:5672","src-queue":"queue1",
    "dest-uri":"amqp://root:rootl23@192.168.0.3:5672","src-exchange-key":"rk2",
    "prefetch-count":64, "reconnect-delay":5,"publish-properties":[],
    "add-forward-headers":true,"ack_mode":"confirm"}}'
    http://192.168.0.2:15672/api/parameters/shovel/%2f/hidden_shovel
  • 第三种,通过在Web管理界面中添加的方式,【Admin】->【Shovel Management】->【Add a new shovel】。

    • 在创建了一个 Shovel link 之后,可以在【Admin】->【Shovel Status】 中查看状态信息。
    • 也可以通过 rabbitmqctl eval 'rabbit shovel_status:status(). 查看相应的 Shovel link。

2.4 案例:消息堆积的治理

消息堆积是在使用消息中间件过程中遇到的最正常不过的事情。消息堆积是一把双刃剑,适量的堆积可以有削峰、缓存之用 ,但是如果堆积过于严重,就可能影响到其他队列的使用,导致整体服务质量的下降。

对于一台普通的服务器来说,在一个队列中堆积1万至10万条消息,丝毫不会影响什么。但是如果这个队列中堆积超过1千万乃至一亿条消息时,可能会引起一些严 重的问题,比如引起内存或者磁盘告警而造成所有 Connection 阻塞。

解决方案:

  • 消息堆积严重时,可以选择清空队列,或者采用空消费程序丢弃掉部分消息。不过对于重要的数据而言,丢弃消息的方案并无用武之地。
  • 另一种方案是增加下游消费者的消费能力,这个思路可以通过后期优化代码逻辑或者增加消费者的实例数来实现。但是后期的代码优化在面临紧急情况时总归是“远水解不了近渴”,并且有些业务场景也井非可以简单地通过增加消费实例而得以增强消费能力。
  • 当某个队列中的消息堆积严重时,比如超过某个设定的阑值,就可以通过 Shovel 将队列中的消息移交给另一个集群。

几种情形:

  • 情形1:当检测到当前运行集群 cluster1 中的队列 queue1 中有严重消息堆积,比如通过 /api/queues/vhost/name 接口获取到队列的消息个数(messages)超过2千万或者消息占用大小(messages_bytes)超过 10GB 时,就启用 shovel1 将队列 queue1 中的消息转发至备份集群 cluster2 中的队列 queue2。
  • 情形2:紧随情形1,当检测到队列 queue1 中的消息个数低于1百万或者消息占用大小低于1GB 时就停止 shovel1,然后让原本队列 queue1 中的消费者慢慢处理剩余的堆积。
  • 情形3:当检测到队列 queue1 中的消息个数低于 10 万或者消息占用大小低于100MB时, 就开启 shovel2 将队列 queue2 中暂存的消息返还给队列 queue1。
  • 情形4:紧随情形3,当检测到队列 queue1 中的消息个数超过1百万或者消息占用大小高于 1GB 时就将 shovel2 停掉。

如此,队列 queue1 就拥有了队列 queue2 这个“保镖”为它保驾护航。这里是一备一的情形,如果需要一备多,可以采用镜像队列或者引入 Federation。

三. Federation/Shovel 与集群的区别和联系

集群、Federation、Shovel这三种部署方式:

集群是最为通用的一种方式。集群将多个 Broker 节点连接起来组成逻辑上独立的单个 Broker 。集群内部借助 Erlang 进行消息传输,所以集群中的每个节点的 Erlang cookie 务必要保持一致。同时,集群内部的网络必须是可靠的,RabbitMQ Erlang 的版本也必须一致。虚拟主机、交换器、用户、权限等都会自动备份到集群中的各个节点。队列可能部署单个节点或被镜像到多个节点中。连接到任意节点的客户端能够看到集群中所有的队列,即使该队列不在所连接的节点之上。通常使用集群的部署方式来提高可靠性和吞吐量,不过集群只能部署在局域网内。

Federation,可以翻译为“联邦”。 Federation 可以通过 AMQP 协议(可配置 SSL)让原本发送到某个 Broker (或集群)中的交换器(或队列)上的消息能够转发到另一个 Broker (或集群)中的交换器(或队列)上,两方的交换器(或队列)看起来是以一种“联邦”的形式在运作。当然必须要确保这些“联邦”的交换器或者队列都具备合适的用户和权限。

联邦交换器(federated exchange)通过单向点对点的连接(Federation link)形式进行通信。默认情况下,消息只会由 Federation 连接转发一次,可以允许有复杂的路由拓扑来提高转发次数。在 Federation 连接上,消息可能不会被转发,如果消息到达了联邦交换器之后路由不到合适的队列,那么它也不会被再次转发到原来的地方(这里指上游交换器,即 upstream exchange)可以通过 Federation 连接广域网中的各个 RabbitMQ 服务器来生产和消费消息。联邦队列(federated queue)也是通过单向点对点连接进行通信的,消息可以根据具体的配置消费者的状态在联邦队列中游离任意次数。

通过 Shovel 来连接各个 RabbitMQ Broker ,概念上 Federation 的情形类似,不过 Shovel 工作在更低一层。鉴于 Federation 从一个交换器中转发消息到另一个交换器(如果必要可以确认消息是否被转发), Shovel 只是简单地从某个 Broker 上的队列中消费消息,然后转发消息到 Broker 上的交换器而已。Shovel 也可以在单独的一台服务器上去转发消息,比如将队列中的数据移动到另一个队列中。如果想获得比 Federation 更多的控制,可以在广域网中使用 Shovel 连接各个 RabbitMQ Broker 来生产或消费消息。

Federation/Shovel 集群
各个 Broke 节点之间逻缉分离 逻辑上是个 Broker 节点
各个 Broker 节点之间可以运行不同版本的 Erlang 和 RabbitMQ 各个 Broker 节点之间必须运行相同版本的 Erlang 和 RabbitMQ
各个 Broker 节点之间可以在广域网中相连,当然必须要授予适当的用户和权限 各个 Broker 节点之间必须在可信赖的局域网中相连,通过 Erlang 内部节点传递消息,但节点问需要有相同的 Erlang cookie
各个 Broker 节点之间能以任何拓扑逻辑部署,连接可以是单向的或者是双向的 所有 Broker 节点都双向连续所有其他节点
从 CAP 理论中选择可用性和分区耐受性,即 AP 从 CAP 理论中选择致性和可用性,CA
一个 Broker 中的交换器可以是 Federation 生成的或者是本地的 集群中所有 Broker 节点中的交换器都是一样的,要么全有要么全无
客户端所能看到它所连接的 Broker 节点上的队列 客户端连接到集群中的任何 Broker 节点都可以看到所有 的队列

参考:

🔗 《RabbitMQ实战指南》