RabbitMQ(二)客户端开发

RabbitMQ(二)客户端开发

一. 连接RabbitMQ

客户端给定参数(IP地址、端口号、用户名、密码等)连接RabbitMQ:

1
2
3
4
5
6
7
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(virtualHost);
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
Connection conn = factory.newConnection();

也可以用URL的方式实现:

1
2
3
4
5
6
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();
//Connection 接口被用来创建一个 Channel:
Channel channel= conn.createChannel();
//在创建之后,Channel可以用来发送或者接收消息了

Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程问共享, 应用程序应该为每一个线程开辟一个 Channel 。多线程问共享 Channel 实例是非线程安全的。

Channel 或者 Connection 中有个 isOpen 方法可以用来检测其是否己处于开启状态,这个方法的返回值依赖于 shutdownCause ,不推荐在生产环境使用,可能会产生竞争。

1
2
3
4
5
public boolean isOpen() {
synchronized(this.monitor) {
return this.shutdownCause == null;
}
}

错误地使用 isOpen 方法:

1
2
3
4
5
6
7
8
9
public void brokenMethod(Channel channel){
if (channel.isOpen()){
//The following code depends on the channel being in opeηstate.
//However there is a possibility of the change in the channel state
//between isOpen() and basicQos(l) call
...
channel.basicQos(l);
}
}

通常情况下,在调用 createXXX 或者 newXXX 方法之后,我们可以简单地认为 Connection 或者 Channel 已经成功地处于开启状态,而并不会在代码中使用 isOpen 这个检测方法。如果在使用 Channel 的时候其己经处于关闭状态,那么程序会抛出一个 com.rabbitmq.client.ShutdownSignalException ,我们只需捕获这个异常即可。当然同时也要试着捕获 IOException 或者 SocketException ,以防 Connection 意外关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void validMethod(Channel channel){
try {
...
channel.basicQos(l);
} catch (ShutdownSignalException sse) {
//possibly check if channel was closed
//by the time we started action and reasons for
//closing it
...
} catch (IOException ioe) {
//check why connection was closed
...
}
}

二. 使用交换器和队列

交换器和队列是 AMQP high-level 层面的构建模块,应用程序需确保在使用它们的时候就已经存在了,在使用之前需要先声明(declare)它们。

1
2
3
4
5
6
//创建一个持久化的、非自动删除的、绑定类型为 direct 交换器
channel.exchangeDeclare(exchangeName, "direct", true);
//同时也创建一个非持久化的、排他的、自动删除的队列(此队列的名称由 RabbitMQ 自动生成)
String queueName = channel.queueDeclare().getQueue();
//使用路由键将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);

上面声明的队列具备如下特性:只对当前应用中同一个 Connection 层面可用,同一个 Connection 的不同 Channel 可共用,并且也会在应用连接断开时自动删除。

如果要在应用中共享一个队列,可以做如下声明:

1
2
3
4
5
//同上
channel.exchangeDeclare(exchangeName , "direct", true) ;
//队列被声明为持久化的 非排他的、非自动删除的,而且也被分配另一个确定的己知的名称(由客户端分配而非 RabbitMQ 自动生成)
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName , exchangeName , routingKey) ;

生产者和消费者都可以声明一个交换器或者队列。

2.1 exchangeDeclare()

exchangeDeclare() 用来声明交换器,有多种重载方法:

1
2
3
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable,
boolean autoDelete, boolean internal,
Map<String , Object> arguments) throws IOException;
  • Exchange.DeclareOk :用来标识成功声明了一个交换器。
  • exchange:交换器的名称
  • type:交换器的类型
    • 常见的如fanout,direct,topic。
  • durable:设置是否持久化
    • 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  • autoDelete:设置是否自动删除
    • 自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
    • 注意不能错误地把这个参数理解为:当与此交换器连接的客户端都断开时, RabbitMQ 会自动删除本交换器。
  • internal:设置是否是内置的的交换器
    • 客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • argument:其他一些结构化参数,比如 alternate-exchange

类似的方法:

  • exchangeDeclareNoWait:无返回值,不建议使用,刚声明完交换器紧接着就使用,可能导致因交换器未创建而发生异常。
  • exchangeDeclarePassive:用来检测相应的交换器是否存在,不存在抛出异常 404 channel exception 同时关闭 Channel 。
  • exchangeDelete:删除交换器。
  • exchangeDeleteNoWait:无返回删除。

2.2 queueDeclare()

queueDeclare() 创建一个队列:

1
2
Queue.DeclareOk queueDeclare() throws IOException; 
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
  • 无参方法默认构建一个 RabbitMQ命名的、排他的、自动删除的、非持久化的队列。
  • queue:队列名称
  • durable:设置是否持久化
    • 持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
  • exclusive:设置是否排他
    • 如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
    • 这里需要注意三点:
      • 排他队列是基于连接( Connection )可见的,同一个连接的不同信道(Channel) 是可以同时访问同一连接创建的排他队列;
      • “首次”是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
      • 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  • autoDelete:设置是否自动删除。
    • 自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
  • arguments:设置队列的其他一些参数。
    • x-message-ttl x-expires x-max-length x-max-length-bytes x-dead-letter-exchange x-dead-letter-routing-key , x-max-priority 等。

注意:生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。

类似的方法这里不再赘述。

2.3 queueBind()

queueBind() 将队列与交换器绑定/解绑。

1
2
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
  • queue:队列名称;
  • exchange:交换器的名称;
  • routingKey:用来绑定队列和交换器的路由键;
  • arguments:定义绑定的一些参数。

2.4 exchangeBind()

exchangeBind() 将交换器与交换器绑定:

1
Exchange.BindOk exchangeBind(String destination, String source , String routingKey, Map<String, Object> arguments) throws IOException;

绑定之后,消息从 source 交换器转发到 destination 交换器,后者可以看作是一个队列。

1
2
3
4
5
6
7
8
9
10
11
//分别声明source和destination交换器
channel.exchangeDeclare("source", "direct", false, true, null) ;
channel.exchangeDeclare("destination", "fanout", false, true, null);
//绑定两者
channel.exchangeBind("destination", "source", "exKey");
//声明队列
channel.queueDeclare("queue", false, false, true, null);
//绑定destination交换器和队列
channel.queueBind("queue", "destination""");
//source推送消息
channel.basicPublish("source", "exKey", null, "exToExDemo". getBytes());

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配 的另一个交换器 destination 井把消息转发到 destination 中,进而存储在 destination 绑定的队列 queue 中:

2.5 何时创建

RabbitMQ 的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。 如果要衡量 RabbitMQ 当前的 QPS ,只需看队列的即可。在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。

按照 RabbitMQ 官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。这是一个很好的建议,但不适用于所有的情况。如果业务本身在架构设计之初己经充分地预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好(比如通过页面管理、 RabbitMQ 命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可。

预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。很多时候, 由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失;或者交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。 当然可以配合 mandatory 参数或者备份交换器来提高程序的健壮性

与此同时,预估好队列的使用情况非常重要,如果在后期运行过程中超过预定的阈值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。迁移的过程也可以对业务程序完全透明。此种方法也更有利于开发和运维分工,便于相应资源的管理。

如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列。 至于是使用预先分配创建资源的静态方式还是动态的创建方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑。


三. 发送消息

channel.basicPublish() 可以用来发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//发送一条持久化的消息 hello world!
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

//mandatory参数可以更好的控制发送
channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes) ;

//这条消息的投递模式(delivery mode)为2,消息会被持久化到服务器中
//消息的优先级设置为1,contentType为text/plain
channel.basicPublish(exchangeName, routingKey,
new AMQPBasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userld("hidden")
.build()), messageBodyBytes);

//也可以设置消息的headers
Map<String, Object> headers = new HashMap<String Object>();
headers.put("localtion""here");
headers.put("time", "today");
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()), messageBodyBytes);

//还可以带过期时间
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()), messageBodyBytes);

包括各种重载方法,有以下具体参数:

  • exchange:交换器的名称,指明消息需要发送到哪个交换器中,如果设置为空字符串, 则消息会被发送到 RabbitMQ 默认的交换器中。
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中
  • props:消息的基本属性集,其包含 14 个属性成员,分别有 contentTypecontentEncodingheaders(Map<String, Object>)deliveryModeprioritycorrelationIdreplyToexpirationmessageIdtimestamptypeuserIdappIdclusterId
  • byte[] body:消息体,真正要发送的消息。
  • mandatory:为 true 时,且交换器无法根据自身的类型和路由键找到一个符合条件的队列,调用 Basic.Return 命令将消息返回给生产者。为 false 时,消息直接被丢弃。
  • immediate:为true时,若交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时, 该消息会通过 Basic.Return 返回至生产者。

四. 消费消息

两种模式:

  • 推模式:采用 Basic.Consume 进行消费;
  • 拉模式:采用 Basic.Get 进行消费。

4.1 推模式

通过持续订阅的方式消费消息,接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现。不同的订阅采用不同的消费者标签(ConsumerTag)来区分彼此。

主要通过方法 channel.basicConsume()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
      
//接收到消息之后进行显式 ack 操作 channel.basicAck 对于消费者很必要,防止消息不必要的消失
boolean autoAck = false;
channel.basicQos(64); //设置客户端最多接收未被 ack 的消息的个数
channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// process the message components here ...
channel.basicAck(envelope.getDeliveryTag(), false);
}
});

常用参数:

  • queue:队列的名称;
  • autoAck:设置是否自动确认。建议设成 false ,即不自动确认;
  • consumerTag:消费者标签,用来区分多个消费者;
  • noLocal 设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者;
  • exclusive:设置是否排他 ;
  • arguments:设置消费者的其他参数;
  • callback:设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息,比如 DefaultConsumer 使用时需要客户端重写其中的方法。

消费者客户端可以重写多种方法:

  • handleDelivery()
  • handleConsumerOk() :会在其他方法之前调用,返回消费者标签。
  • handleCancelOk() :消费端可以在显式地取消订阅的时候调用。
  • handleCancel() :消费端可以在隐式地取消订阅的时候调用
  • handleShutdownSignal() :当 Channel 或者 Connection 关闭的时候会调用。
  • handleRecoverOk()

通过 channel.basicCancel() 显式地取消一个消费者的订阅,先触发 handleConsumerOk() ,然后是 handleDelivery() ,最后是 handleCancelOk()

和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些 callback 会被分配到与 Channel 不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclarechannel.basicCancel 等。

每个 Channel 都拥有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者, 也就是意味着消费者彼此之间没有任何关联。当然也可以在一个 Channel 中维持多个消费者, 但是要注意一个问题,如果 Channel 中的一个消费者一直在运行,那么其他消费者的 callback 会被“耽搁”。

4.2 拉模式

通过 channel.basicGet() 可以单条的获取消息(连续调用拉取多条消息),当 autoAck 设置为false,需要用 channel.basicAck() 来确认消息已被成功接收。

1
2
3
GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out,println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);

Basic.Consume 将信道(Channel)置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos 的限制.如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费.但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 RabbitMQ 的性能.如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。

批量拉取:

  1. 批量拉取循环的退出条件:达到数量上限,basicGet返回null。
  2. 使用basic批量ACK传递的参数是最后一条消息的deliveryTag。
  3. DefaultConsumer 运行在Connection的线程池中不同,使用拉模式需要自己创建线程池。

4.3 对比

(1)生产者和服务节点间都是推模式

一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。 Producer 与 Broker 之间都是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息。原因如下:

  1. 如果需要 Broker 去拉取消息,那么 Producer 就必须在本地通过日志的形式保存消息来等待 Broker 的拉取,如果有很多生产者的话,那么消息的可靠性不仅仅靠 Broker 自身,还需要靠成百上千的 Producer。
  2. Broker 还能靠多副本等机制来保证消息的存储可靠,而成百上千的 Producer 可靠性就有点难办了,所以默认的 Producer 都是推消息给 Broker。
MQ类别 推拉模式
RabbitMQ 推拉都支持。官方推荐使用推模式
RocketMQ 推拉都支持。(本质上,推模式也是拉模式)
Kafka 只有拉模式

(2)推、拉模式的优缺点

  • 推模式:指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。
    • 优点:
      1. 消息实时性高。Broker 接受完消息之后可以立马推送给 Consumer。
      2. 对于消费者使用来说更简单。消息来了就消费即可。
    • 缺点:推送速率难以适应消费速率
      1. 推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。当推送速率过快就像 DDos 攻击一样消费者就傻了。
      2. 不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。这其实就增加了 Broker 自身的复杂度。
    • 使用场景:推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下。
  • 拉模式:指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。
    • 优点:
      1. 消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
      2. Broker 相对轻松了。它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。
      3. 可以更合适的进行消息的批量发送。基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。
    • 缺点:
      1. 消息延迟。毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。
      2. 消息忙请求。忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。
    • 使用场景:消费者在某个条件成立时才能消费消息,以及需要批量拉取消息进行处理。

五. 消费端的确认与拒绝

为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制 (message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制后,只要设置 autoAck 参数为 false ,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题 因为 RabbitMQ 会一直等待持有消息直到消费者显式调 Basic.Ack 命令为止。

当 autoAck 参数置为 false ,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbtiMQ 的 Web 管理平台上可以看到当前队列中的“Ready ”状态 和“Unacknowledged "状态的消息数,分别对应上文中的等待投递给消费者的消息数和己经投递给消费者但是未收到确认信号的消息数:

也可以通过相应的命令来查看上述信息:

1
2
3
4
$ rabbitmqctl list_queues name message_ready messages_unacknowledged
Listing queues ...
queue 1 0
queue_demo 0

在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?通过 Basic.Reject 这个命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

1
void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag:消息编号,64位的长整型值。
  • requeue:当为true时,RabbitMQ会重新将此条消息存入队列,以便可以发送给下一个订阅的消费者;当为false时,立即将消息从队列中移除。

批量拒绝消息需要 Basic.Nack ,客户端:

1
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • multiple:为false时,表示拒绝单条消息;为true时,拒绝编号前所有未被消费者确认的消息。

channel.basicRecover() 请求RabbitMQ重新发送还未被确认的消息:

1
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
  • requeue:为true,未被确认的消息会被重新加入到队列,可能会被分配给另外一个消费者;为false,同一条消息会被分配给之前相同的消费者。

六. 关闭连接

在应用程序使用完之后,需要关闭连接,释放资源:

1
2
channel.close();
conn.close();

在 Connection 关闭的时候, Channel 也会自动关闭。

Connection Channel 所具备的生命周期如下所述:

  • Open:开启状态,代表当前对象可以使用。
  • Closing:正在关闭状态。当前对象被显式地通知调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行操作并等待这些关闭操作的完成。
  • Closed:已经关闭状态。当前对象己经接收到所有的内部对象己完成关闭动作的通知,并且其也关闭了自身。

当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用 ShutdownListener 。而且如果将一个 ShutdownListener 注册到一个己经处于 Closed 状态的对象(这里特指 Connection 和 Channel 对象),会立刻调用 ShutdownListener

getCloseReason 方法可以让你知道对象关闭的原因 isOpen 方法检测对象当前是否处于开启状态: close(int closeCode , String closeMessage) 方法显式地通知当前对象执行关闭操作。

1
2
3
4
5
connection.addShutdownListener(new ShutdownListener(){
public void shutdownCompleted(ShutdownSignalException cause){
......
}
});

当触发 ShutdownListener 的时候,就可以获取到 ShutdownSignalException ,这个 ShutdownSignalException 包含了关闭的原因,可以通过getCloseReason 方法获取。 isHardError 方法 可以知道是 Connection 的还是 Channel 的错误; getReason 方法可以获取 cause 相关的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdownCompleted(ShutdownSignal cause){
if(cause.isHardError()){
Connection conn = (Connection) cause.getReference();
if(!cause.isInitiatedByApplication()){
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel) cause.getReference();
...
}
}

七. 消息删除

  1. 在web管理端可以选择要操作的队列,下面有一个Purge按钮。

  2. 清空指定queue队列的数据。

    1
    2
    3
    4
    #查看队列
    $ rabbitmqctl list_queues
    #清空指定队列
    $ rabbitmqctl purge_queue queue_name
  3. 用rabbitmqadmin清空队列

    1
    2
    3
    4
    #查看队列
    $ rabbitmqadmin list queues
    #清空指定队列
    $ rabbitmqadmin delete queue name=queue_name
  4. 清空全部队列(慎用)

    1
    2
    3
    $ rabbitmqctl stop_app
    $ rabbitmqctl reset
    $ rabbitmqctl start_app

参考:

🔗 《RabbitMQ实战指南》

🔗 MQ–推模式与拉模式

🔗 RabbitMQ拉模式批量消费消息