Spring AMQP

Spring AMQP

一. 简介

1.1 什么是Spring AMQP

Advanced Message Queuing Protocol,即高级消息队列协议,旨在提供一种跨平台跨语言的消息服务,AMQP是一种应用层协议,不同于JMS(API接口),使用 TCP 提供可靠投递的应用层协议。

Spring AMQP则是Spring方便开发RabbitMQ程序集成的一个第三方类库。

  • 提供“模板”作为用于发送和接收消息的高级抽象。
  • 还为消息驱动的 POJO 提供支持。

1.2 简单使用

(1)Maven依赖

引入Maven依赖:

1
2
3
4
5
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>

此包同时包含:

  • spring-context
  • spring-tx
  • spring-web
  • spring-messaging
  • spring-retry
  • spring-amqp
  • amqp-client

(2)快速上手

基于默认配置:

1
2
3
4
5
6
ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");

(3)XML配置

基于XML配置:

1
2
3
4
ApplicationContext context = new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
1
2
3
4
5
6
7
8
9
10
11
12
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
</beans>

(4)Java配置

同样的示例使用 Java 中的外部配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
	ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");

........

@Configuration
public class RabbitConfiguration {

@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}

@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}

二. 使用

2.1 ConnectionFactory默认配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Configuration
public class MqProducerConfig {

@Bean
public ConnectionFactory amqpConnectionFactory(ConnectionListener connectionListener,
RecoveryListener recoveryListener,
ChannelListener channelListener) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// 配置rabbitmq的地址、端口,集群部署的情况下可填写多个,“,”分隔
connectionFactory.setAddresses("localhost:5672");
// 用户名
connectionFactory.setUsername("guest");
// 密码
connectionFactory.setPassword("guest");
// 虚拟主机
connectionFactory.setVirtualHost("/");
// 设置缓存模式,共有两种,CHANNEL和CONNECTION模式
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
// 设置每个Connection中可以缓存的Channel的数量,非数量上限,获取Channel前会先尝试从缓存中找到一个闲置的,若无则会创建新的,当数量超过缓存数量时,多出来的会被关闭。该值修改只会影响后创建的Connection
connectionFactory.setChannelCacheSize(25);
// 设置Channel等待时间,大于0时setChannelCacheSize会同时变为数量上限,从缓存取不到可用的Channel时不会创建新的,而是等待该值指定的毫秒时间,若到点仍无可用的Channel会抛出AmqpTimeoutException异常;Connection模式下,该值也同样是Connection的获取超时时间
connectionFactory.setChannelCheckoutTimeout(0);
// Producer端消息确认机制开关,return机制
connectionFactory.setPublisherReturns(false);
// Producer端消息确认机制开关,confirm机制
connectionFactory.setPublisherConfirms(false);
// 配置ConnectionListener
connectionFactory.addConnectionListener(connectionListener);
// 配置ChannelListener
connectionFactory.addChannelListener(channelListener);
// 配置RecoveryListener
connectionFactory.setRecoveryListener(recoveryListener);
// 仅在Connection模式下使用,设置Connection的缓存数量
//connectionFactory.setConnectionCacheSize(1);
// 仅在Connection模式下使用,设置Connection的数量上限
//connectionFactory.setConnectionLimit(Integer.MAX_VALUE);
return connectionFactory;
}
}
  • Channel模式:程序运行期间ConnectionFactory会维护一个Connection,所有的操作都使用此Connection,但一个Connection中可以有多个Channel,操作rabbitmq之前都必须先获取到一个Channel,否则就会阻塞(可以通过 setChannelCheckoutTimeout() 设置等待时间),这些Channel会被缓存(缓存的数量可以通过 setChannelCacheSize() 设置);
  • Connection模式:这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,除了可以有多个Connection,其它都跟CHANNEL模式一样。

2.2 com.rabbitmq.client.ConnectionFactory自定义配置

ConnectionFactory类连接MQ节点的Connection配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/** Default user name */

public static final String DEFAULT_USER = "guest";

/** Default password */

public static final String DEFAULT_PASS = "guest";

/** Default virtual host */

public static final String DEFAULT_VHOST = "/";

/** Default maximum channel number;

* zero for unlimited */

public static final int DEFAULT_CHANNEL_MAX = 0;

/** Default maximum frame size;

* zero means no limit */

public static final int DEFAULT_FRAME_MAX = 0;

/** Default heart-beat interval;

* 60 seconds */

public static final int DEFAULT_HEARTBEAT = 60;

/** The default host */

public static final String DEFAULT_HOST = "localhost";

/** 'Use the default port' port */

public static final int USE_DEFAULT_PORT = -1;

/** The default non-ssl port */

public static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT;

/** The default ssl port */

public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;

/** The default TCP connection timeout: 60 seconds */

public static final int DEFAULT_CONNECTION_TIMEOUT = 60000;

/**

* The default AMQP 0-9-1 connection handshake timeout. See DEFAULT_CONNECTION_TIMEOUT

* for TCP (socket) connection timeout.

*/

public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10000;

/** The default shutdown timeout;

* zero means wait indefinitely */

public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;

/** The default continuation timeout for RPC calls in channels: 10 minutes */

public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10);

/** The default network recovery interval: 5000 millis */

public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;

private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";

private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";

修改默认连接配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public ConnectionFactory connectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory);
return connectionFactory;
}

@Bean
public com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory() {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setAutomaticRecoveryEnabled(false);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}

2.3 AmqpTemplate配置

若消费者端通过 @RabbitListener 注解的方式接收消息,则用不到此Bean。不建议直接使用ConnectionFactory获取Channel操作MQ,而是使用AmqpTemplate。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory amqpConnectionFactory,
RabbitTemplate.ReturnCallback returnCallback,
RabbitTemplate.ConfirmCallback confirmCallback,
RetryTemplate retryTemplate,
MessageConverter messageConverter){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 设置spring-amqp的ConnectionFactory
rabbitTemplate.setConnectionFactory(amqpConnectionFactory);
// 设置重试机制
rabbitTemplate.setRetryTemplate(retryTemplate);
// 设置MessageConverter,用于java对象与Message对象(实际发送和接收的消息对象)之间的相互转换
rabbitTemplate.setMessageConverter(messageConverter);
// 打开或关闭Channel的事务
rabbitTemplate.setChannelTransacted(false);
// return机制的回调接口
rabbitTemplate.setReturnCallback(returnCallback);
// confirm机制的回调接口
rabbitTemplate.setConfirmCallback(confirmCallback);
// 设为true使ReturnCallback生效
rabbitTemplate.setMandatory(false);
return rabbitTemplate;
}

2.4 RabbitListenerContainerFactory配置

这个bean仅在消费者端通过 @RabbitListener 注解的方式接收消息时使用,每一个 @RabbitListener 注解的方法都会由这个RabbitListenerContainerFactory创建一个MessageListenerContainer,负责接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory, ErrorHandler errorHandler, MessageConverter messageConverter) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 设置spring-amqp的ConnectionFactory
factory.setConnectionFactory(cachingConnectionFactory);
// 对于消费者端,MessageConverter也可以在这里配置
factory.setMessageConverter(messageConverter);
// 设置消费者端的应答模式,共有三种:NONE、AUTO、MANUAL
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个
factory.setConcurrentConsumers(1);
// 设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量
factory.setMaxConcurrentConsumers(1);
// 设置每次请求发送给每个Consumer的消息数量
factory.setPrefetchCount(250);
// 设置Channel的事务
factory.setChannelTransacted(false);
// 设置事务当中可以处理的消息数量
factory.setTxSize(1);
// 设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃
factory.setDefaultRequeueRejected(true);
// 实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理
factory.setErrorHandler(errorHandler);
return factory;
}

消费者端的应答模式:

  • NONE:无应答,这种模式下rabbitmq默认consumer能正确处理所有发出的消息,所以不管消息有没有被consumer收到,有没有正确处理都不会恢复。
  • AUTO:由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。
  • MANUAL:基本同AUTO模式,区别是需要人为调用方法给应答。

2.5 发送消息

AmqpTamplate的发送消息接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)

throws AmqpException;

如下:

1
amqpTemplate.send(exchange, routingKey, new Message(JSON.toJSONString(event).getBytes(), MessagePropertiesBuilder.newInstance().build()));

2.6 接收消息

(1)拉模式

1
2
3
4
5
6
7
8
9
// 轮询调用方法一次获取一条
Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

// 如果queue里面没有消息,会立刻返回null;传入timeoutMillis参数后可阻塞等待一段时间
Message receive(String queueName, long timeoutMillis) throws AmqpException;

直接转换为Java对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

// 这四个方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,这是一个接口,Jackson2JsonMessageConverter已经实现了这个接口,所以只要将Jackson2JsonMessageConverter设置到RabbitTemplate中即可。
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;

<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;

<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;

<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type)

throws AmqpException;

Foo<Bar<Baz, Qux>> foo = rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Foo<Bar<Baz, Qux>>>() { });

(2)推模式

通过方法 channel.basicConsume() 实现推模式,或者使用注解 @RabbitListener

1
2
3
4
5
6
7
8
@Component
public class RabbitMqListener {

@RabbitListener(queues = "queueName")
public void listen(Message message) {
JSON.parseObject(new String(message.getBody()), typeReference);
}
}

如果在@RabbitListener注解中指明binding信息,就能自动创建queue、exchange并建立binding关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// direct和topic类型的exchange需要routingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", durable = "true"),
key = "orderRoutingKey.#")
)

// fanout类型的exchange
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", durable = "true", type = ExchangeTypes.FANOUT))
)

// 2.0版本之后,可以指定多个routingKey:key = { "red", "yellow" }
// 并且支持arguments属性,可用于headers类型的exchange
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "foo", value = "bar"),
@Argument(name = "baz")
})
)

@Queue有两个参数:

  • exclusive:排他队列,只对创建这个queue的Connection可见,Connection关闭则queue删除;

  • autoDelete:没有consumer对这个queue消费时删除。

对于这两种队列,durable=true是不起作用的。另外,如果注解申明的queue和exchange及binding关系都已经存在,但与已存在的设置不同,比如,已存在的exchange的是direct类型,这里尝试改为fanout类型,结果是不会有任何影响,不论是修改或者新增参数都不会生效。

如果queue存在,exchange存在,但没有binding,那么程序启动后会自动建立起binding关系。

消费者消息处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 手动ACK消息,后一个参数用于多条ACK,<=deliveryTag同时ACK掉。
channel.basicAck(deliveryTag, false);

// 不确认deliveryTag对应的消息
// 第二个参数是否应用于多消息
// 第三个参数是否重新放入队列requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息,nack后的消息也会被自己消费到。
channel.basicNack(deliveryTag, false, true);
// 抛弃此消息
channel.basicNack(deliveryTag, false, false);

// 拒绝deliveryTag对应的消息
// 第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。该方法reject后,该消费者还是会消费到该条被reject的消息。
channel.basicReject(deliveryTag, true);

// 是否恢复消息到队列,参数为是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
channel.basicRecover(true);

2.7 MessageConverter

在发送和接收消息时自动完成Message和Java对象的转换。

MessageConverter 接口:

1
2
3
4
5
6
7
public interface MessageConverter {
@Nullable
Object fromMessage(Message<?> message, Class<?> targetClass);

@Nullable
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
}

即使不手动配置MessageConverter,也会有一个默认的SimpleMessageConverter,它会直接将java对象序列化。

官方文档不建议使用这个MessageConverter,因为SimpleMessageConverter是将java对象在producer端序列化,然后在consumer端反序列化,这会将producer和consumer紧密地耦合在一起,并且仅限于java平台。

推荐用JsonMessageConverter、Jackson2JsonMessageConverter,这两个是都将java对象转化为json再转为 byte[] 来构造Message对象,前一个用的是jackson json lib,后一个用的是jackson 2 json lib。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}

@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter);
return template;
}

2.8 异常处理

spring-rabbit 暴露了两个接口可供实现用来处理 @RabbitListener 注解方法抛出的异常:

  • RabbitListenerErrorHandler
  • org.springframework.util.ErrorHandler

RabbitListenerErrorHandler,设置在 @RabbitListener 注解上,只对当前注解的方法生效(当前方法抛异常时被调用):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public RabbitListenerErrorHandler rabbitListenerErrorHandler(){
return new RabbitListenerErrorHandler() {
@Override
public Object handleError(Message amqpMessage,
org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception {
System.out.println(message);
throw exception;
}
};
}

@RabbitListener(queues = "test_queue_1", errorHandler = "rabbitListenerErrorHandler")
public void listen(Message message){
...
}


ErrorHandler,是 spring-core 包下面的 ErrorHandler ,可实现这个接口设置在 RabbitListenerContainerFactory 里面,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Bean
@Autowired
public RabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory,
MessageConverter messageConverter){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setMessageConverter(messageConverter);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
throw new AmqpRejectAndDontRequeueException(t);
}
});
factory.setDefaultRequeueRejected(false);
return factory;
}

这个ErrorHandler对所有@RabbitListener注解方法生效。

二者对比:

  • 作用范围:RabbitListenerErrorHandler只对当前 @RabbitListener 注解方法生效,ErrorHandler对所有 @RabbitListener 注解方法生效;
  • 调用顺序:RabbitListenerErrorHandler先被调用,ErrorHandler后被调用;
  • 处理粒度:RabbitListenerErrorHandler粒度比较细,可以获取到当前Message,以便做细致处理,ErrorHandler只能获取到Throwable参数;
  • 默认配置:RabbitListenerErrorHandler没有默认配置,ErrorHandler有默认值ConditionalRejectingErrorHandler。

ConditionalRejectingErrorHandler的作用:

  • 打印日志;
  • 部分异常导致的失败不会requeue消息(默认处理失败的消息会requeue,AcknowledgeMode.NONE模式除外)。

2.9 事务

amqp事务仅仅适用于publish和ack,rabbitmq增加了reject的事务。其它操作都不具备事务特性。

也就是说,rabbitmq本身的事务可以保证producer端发出的消息成功被broker收到(不能保证一定会进入queue),consumer端发出的确认信息成功被broker收到,其它诸如consumer端具体的消费逻辑之类如果想要获得事务功能,需要引入外部事务。

引入rabbitmq事务很简单,将RabbitTemplate或者RabbitListenerContainerFactory的channelTransacted属性设为true即可,示例:

1
2
3
4
5
6
7
8

@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory amqpConnectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(amqpConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}

也可以直接操作Channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Channel channel = cachingConnectionFactory.createConnection().createChannel(true);
try {
//channel.txSelect();上面createChannel已经设为true了,这句可以去掉
channel.basicPublish("xxx", "xxx", new AMQP.BasicProperties(), JSON.toJSONString(event).getBytes());
channel.txCommit();
} catch (IOException e) {
try {
channel.txRollback();
} catch (IOException e1) {
}
} finally {
try {
channel.close()
} catch (Exception e) {
}
}

需要注意的是,直接通过Connection获取的Channel需要手动close。

对于producer端,同样的发送一条消息到一个不存在的exchange:

1
amqpTemplate.convertAndSend("notExistExchange", "routingKey", object);
  • 如果关闭事务,CachingConnectionFactory会打出一条错误日志,但程序会正常运行。
  • 如果打开事务,由于消息没有到达broker,这里会抛出异常。

对于consumer端,当consumer正在处理一条消息时:

  • 如果broker挂掉,程序会不断尝试重连,当broker恢复时,会重新收到这条消息;
  • 如果程序挂掉,broker发现还没有收到consumer的确认信息但consumer没了,会将这条消息恢复;
  • 长时间没有收到consumer端的确认信息,也会将消息从unacked状态变成ready状态;
  • 如果程序处理消息期间抛异常,broker会收到一个nack或者reject,也会将这条消息恢复。

所以,rabbitmq是可以将没有成功消费的消息恢复的,consumer端使用rabbitmq事务的意义并不是很大,也许可以用于consumer端消息去重:

  • consumer处理成功向rabbitmq发出了ack,consumer默认rabbitmq收到了这个ack所以consumer认为这条消息处理结束,但实际可能rabbitmq没有收到ack又将这条消息放回queue然后重新发给consumer导致消息重复处理。如果开启了事务,能保证rabbitmq一定能收到确认信息,否则事务提交失败。

另外,需要注意的是,开启事务会大幅降低消息发送及接收效率,因为当已经有一个事务存在时,后面的消息是不能被发送或者接收(对同一个consumer而言)的,所以以上两种场景都不推荐使用事务来解决。

2.10 Listeners

ChannelListener接口,监听Channel的创建和异常关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
public ChannelListener channelListener() {
return new ChannelListener() {
@Override
public void onCreate(Channel channel, boolean transactional) {
logger.info("channel number:{}, nextPublishSqlNo:{}",
channel.getChannelNumber(),
channel.getNextPublishSeqNo());
}

@Override
public void onShutDown(ShutdownSignalException signal) {
logger.error("channel shutdown, reason:{}, errorLevel:{}",
signal.getReason().protocolMethodName(),
signal.isHardError() ? "connection" : "channel");
}
};
}

BlockedListener监听Connection的block和unblock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public BlockedListener blockedListener() {
return new BlockedListener() {
@Override
public void handleBlocked(String reason) throws IOException {
logger.info("connection blocked, reason:{}", reason);
}

@Override
public void handleUnblocked() throws IOException {
logger.info("connection unblocked");
}
};
}

ConnectionListener监听Connection的创建、关闭和异常终止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public ConnectionListener connectionListener() {
return new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
logger.info("connection created.");
}

public void onClose(Connection connection) {
logger.info("connection closed.");
}

public void onShutDown(ShutdownSignalException signal) {
logger.error("connection shutdown, reason:{}, errorLevel:{}",
signal.getReason().protocolMethodName(),
signal.isHardError() ? "connection" : "channel");
}
};
}

RecoveryListener监听开始自动恢复Connection、自动恢复连接完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public RecoveryListener recoveryListener() {
return new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
logger.info("automatic recovery completed");
}

@Override
public void handleRecoveryStarted(Recoverable recoverable) {
logger.info("automatic recovery started");
}
};
}

ConnectionListener、ChannelListener、RecoveryListener设置到ConnectionFactory即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public CachingConnectionFactory cachingConnectionFactory(ConnectionListener connectionListener, ChannelListener channelListener, RecoveryListener recoveryListener) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(mqConfigBean.getAddresses());
connectionFactory.setUsername(mqConfigBean.getUsername());
connectionFactory.setPassword(mqConfigBean.getPassword());
connectionFactory.setVirtualHost(mqConfigBean.getVirtualHost());
connectionFactory.addConnectionListener(connectionListener);
connectionFactory.addChannelListener(channelListener);
connectionFactory.setRecoveryListener(recoveryListener);
connectionFactory.setChannelCacheSize(3);
return connectionFactory;
}

通过ConnectionListener和ChannelListener可以debug看出Connection和Channel都是有缓存的,因为 onCreate() 方法不会每次都调用。并且Connection和Channel的创建都是lazy的,程序启动时不会创建Connection和Channel,在第一次用到的时候才会创建。

2.11 多个@RabbitListener消费一个queue

一个服务中可以有多个@RabbitListener注解的方法消费一个queue,如下:

1
2
3
4
5
6
7
8
9
10

@RabbitListener(queues = "queueName")
public void listener1(Message message) {
...
}

@RabbitListener(queues = "queueName")
public void listener2(Message message) {
...
}

这样写使用的仍是同一个Connection,一条消息也不会被两个方法都调用,如果RabbitListenerContainerFactory中设置concurrentConsumer为3,意味着每个方法产生3个consumer,一共会有6个consumer对这个queue进行消费。也可以分布在不同的应用程序中,那样会在不同的Connection中。

一个服务中有如上的两个方法消费同一个queue,另一个服务中有一个方法消费同一个queue,则会有两个消费者Connection,一个有3个Channel,一个有6个Channel。

2.12 生产者消息确认机制confirm and return

为了能让producer端知道消息是否成功进入了queue,并且避免使用事务大幅降低消息发送效率,可以用confirm和return机制来代替事务。

首先实现两个Callback,ReturnCallback和ConfirmCallback,需要哪个实现哪个,不一定都需要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public RabbitTemplate.ReturnCallback returnCallback() {
return new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("return call back");
}
};
}

public RabbitTemplate.ConfirmCallback confirmCallback() {
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("confirm call back");
}
};
}

然后将这两个Callback设置到RabbitTemplate中,将mandatory属性设为true(ReturnCallback需要,ConfirmCallback不需要):

1
2
3
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setMandatory(true);

然后在ConnectionFactory中将这Confirm和Return机制打开:

1
2
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirms(true);

调用条件:

  • ConfirmCallback:每一条发出的消息都会调用ConfirmCallback;

  • ReturnCallback:只有在消息进入exchange但没有进入queue时才会调用。

参数:

  • correlationData:RabbitTemplate的send系列方法中有带这个参数的,如果传了这个参数,会在回调时拿到;
  • ack:消息进入exchange,为true,未能进入exchange,为false,由于Connection中断发出的消息进入exchange但没有收到confirm信息的情况,也会是false;
  • cause:消息发送失败时的失败原因信息。

关于confirm和return官方文档上有下面这段信息,有必要了解一下:

When a rabbit template send operation completes, the channel is closed; this would preclude the reception of confirms or returns in the case when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns/confirms will proceed as normal). When the cache is full, the framework defers the close for up to 5 seconds, in order to allow time for the confirms/returns to be received. When using confirms, the channel will be closed when the last confirm is received. When using only returns, the channel will remain open for the full 5 seconds. It is generally recommended to set the connection factory’s channelCacheSize to a large enough value so that the channel on which a message is published is returned to the cache instead of being closed. You can monitor channel usage using the RabbitMQ management plugin; if you see channels being opened/closed rapidly you should consider increasing the cache size to reduce overhead on the server.

当RabbitTemplate发送操作完成后,通道被关闭;这将排斥了在连接工厂缓存已满的情况下接收确认或返回(当缓存有空间时,通道不会被物理关闭,返回/确认将正常进行)。当缓存已满时,框架会将关闭时间推迟5秒,以便有时间接收确认/返回信息。

  • 当使用确认时,通道将在收到最后一个确认时被关闭。
  • 当只使用返回时,通道将保持完整的5秒开放。

通常建议将连接工厂的 channelCacheSize 设置为足够大的值,以便将发布消息的通道返回到缓存中而不是关闭。您可以使用 RabbitMQ 管理插件监控通道的使用情况;如果您看到通道被快速打开/关闭,您应该考虑增加缓存大小以减少服务器上的开销。

异步的接收confirm和return时仍然需要走原来发送消息用到的那个Channel,如果那个Channel被关闭了,是收不到confirm/return信息的。好在根据以上说明,Channel会等到最后一个confirm接收到时才会close,所以应该也不用担心Channel被关闭而接收不到confirm的问题(异常会导致Channel提前关闭吗)。

2.13 重试机制

重试机制主要是解决网络不稳导致连接中断的问题。所以其实并不是重新发送消息,而是重新建立。

1
2
3
4
5
6
7
8

@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(Integer.MAX_VALUE);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
return retryTemplate;
}

如上,配置一个RetryTemplate,再设置到AmqpTemplate即可。

RetryTemplate与 spring-amqp 及rabbitmq都没有关系,这是 spring-retry 中的类。以上示例中使用了最简单的重试策略,不断重试,直到 Integer.MAX_VALUE 次为止。

对producer端而言,如果Connection正常,但发送消息失败是不会重试的,如指定的exchange不存在的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1条发送完毕

收到第1条confirm,ack:false, correlationData:null

17:26:09.544 [AMQP Connection 127.0.0.1:5672] ERROR [CachingConnectionFactory.java:1344] - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxx' in vhost 'vhost', class-id=60, method-id=40)

第2条发送完毕

收到第2条confirmack:false, correlationData:null

17:26:10.552 [AMQP Connection 127.0.0.1:5672] ERROR [CachingConnectionFactory.java:1344] - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxx' in vhost 'vhost', class-id=60, method-id=40)

第3条发送完毕

收到第3条confirmack:false, correlationData:null

17:26:11.559 [AMQP Connection 127.0.0.1:5672] ERROR [CachingConnectionFactory.java:1344] - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxx' in vhost 'vhost', class-id=60, method-id=40)

由Connection中断导致的发送消息失败,会进行重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
7条发送完毕

收到第7条confirm,ack:true, correlationData:null

8条发送完毕

收到第8条confirm,ack:true, correlationData:null

9条发送完毕

收到第9条confirm,ack:true, correlationData:null

10条发送完毕

收到第10条confirm,ack:true, correlationData:null

11条发送完毕

收到第11条confirm,ack:true, correlationData:null

17:01:44.000 [AMQP Connection 127.0.0.1:5672] ERROR [CachingConnectionFactory.java:1344] - Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)

17:01:44.005 [AMQP Connection 127.0.0.1:5672] WARN [ForgivingExceptionHandler.java:115] - An unexpected connection driver error occured (Exception message: Connection reset)

17:01:44.602 [http-nio-8080-exec-2] INFO [AbstractConnectionFactory.java:455] - Attempting to connect to: [localhost:5672]

...

17:02:23.076 [http-nio-8080-exec-2] INFO [AbstractConnectionFactory.java:455] - Attempting to connect to: [localhost:5672]

17:02:24.578 [http-nio-8080-exec-2] INFO [AbstractConnectionFactory.java:471] - Created new connection: amqpConnectionFactory#3412a3fd:20/SimpleConnection@41298ed [delegate=amqp://guest@0:0:0:0:0:0:0:1:5672/test, localPort= 55092]

12条发送完毕

收到第12条confirm,ack:true, correlationData:null

13条发送完毕

收到第13条confirm,ack:true, correlationData:null

14条发送完毕

收到第14条confirm,ack:true, correlationData:null

15条发送完毕

收到第15条confirm,ack:true, correlationData:null

没有配置重试,或到达了重试次数依然失败,会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
15条发送完毕

收到第15条confirm,ack:false, correlationData:null

17:41:13.571 [AMQP Connection 127.0.0.1:5672] ERROR [CachingConnectionFactory.java:1344] - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxx' in vhost 'paas_v3_vhost', class-id=60, method-id=40)

第16条发送完毕

收到第16条confirmack:false, correlationData:null

17:41:14.583 [AMQP Connection 127.0.0.1:5672] ERROR [CachingConnectionFactory.java:1344] - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxx' in vhost 'paas_v3_vhost', class-id=60, method-id=40)

17:41:15.322 [AMQP Connection 127.0.0.1:5672] WARN [ForgivingExceptionHandler.java:115] - An unexpected connection driver error occured (Exception message: Connection reset)

17:41:15.579 [http-nio-8080-exec-1] INFO [AbstractConnectionFactory.java:455] - Attempting to connect to: [localhost:5672]

17:41:17.609 [http-nio-8080-exec-1] ERROR [ExceptionHandler.java:41] - unknown error

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62)

at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:484)

at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:626)

at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:576)

对Consumer端,如果采用的是 @RabbitListener 或其它类似异步接收消息的方式,则没必要配置重试。Consumer端有Ack机制,Connection中断导致RabbitMq收不到Ack信息,消息会重新入队(可能会导致同一条消息重复消费)。

对于直接调用RabbitTemplate的Receive系列方法获取消息的消费方式,则同消息发送端,没有Retry或Retry次数达到,则抛异常。

2.14 发送端的消息丢失

讨论两种情况可能产生的消息丢失:

  1. RabbitMq没挂,只是短暂的网络异常,连接可以恢复,消息发送出去但没有到exchange。
  2. RabbitMq挂了且长时间无法恢复,消息没有发出去;

(1)可恢复的Connection中断

配置开启Retry情况下,Connection中断会根据配置的retry策略尝试重连,但即使重新连上了,消息依然可能会丢失。

  • 本地测试,单线程间隔1毫秒循环发送1万条消息,模拟一个不断有消息发出的场景,在发送过程中手动关闭Rabbitmq服务再重新启动,模拟Connection短暂中断的场景。
  • 因为每一条消息都带有唯一的messageId(实际上是“线程名-序号”的形式),所以能轻易地从消费端读出所有消息之后找到丢失的消息。
  • 测试结果:发送1万条消息,实际收到9999条,丢失1条。

发送端通过ConfirmCallback打印出所有ack=false的消息:

1
2
3
4
5
6
7
8
9
10
11
----------打印ack=false的消息----------

size:4

pool-5-thread-1-5881

pool-5-thread-1-5882

pool-5-thread-1-5883

pool-5-thread-1-5884

消费端读出所有消息后,找出丢失的消息:

1
2
3
4
5
6
7
--------total:10000---------

----------contain size: 9999----------

----------absent size: 1----------

pool-5-thread-1-5883

可以看到,ack=false的消息有4条,但实际上只丢了一条。因为消息的发送和Confirm是异步进行的,如果在消息发送出去之后,异步的confirm回来之前,Connection中断,那么ConfirmCallback会立即被调用,并且ack=false,原因是Channel被关闭了。

  • 单线程情况下应该最多只会丢失一条,也有可能不会丢。
  • 多线程的情况下丢消息的现象就很严重了。本地测试5个线程发消息的情况,一共50000条消息,丢失了1500多条。但其实如果把这5个线程分到5个请求,一个请求只跑一个线程,情况会好很多,类似于上面单线程的情况。
  • 解决方案:首先可以想到的解决方案是事务,但事务一般不建议使用,为了rabbitmq的效率,退而求其次,采用confirm机制。

从上面的测试可以看到,在ConfirmCallback中ack=false的消息未必真的没有到达exchange,但没有到达exchange的消息ack一定是false,所以只需要将ack=false的消息重新发送一遍即可(这种方案会导致消息重复发送,后面再解决这一问题)。

ConfirmCallback的回调方法中没有Message对象,你可能会想从ConfirmCallback中拿到Message对象,当ack=false的时候将这个Message再重新发出去,但方法入参中没有Message对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class ReissueMessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
private static final Logger logger = LoggerFactory.getLogger(ReissueMessageConfirmCallback.class);

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause){
if (correlationData instanceof MessageCorrelationData) {
MessageCorrelationData messageCorrelationData = (MessageCorrelationData) correlationData;
logger.info("------------messageId: " + messageCorrelationData.getMessage().getMessageProperties().getMessageId() +
", ack: " + ack + ", cause:" + cause + "--------------");

if (!ack) {
SendFailedMessageHolder.add(messageCorrelationData);
}
}
}
}

注意到入参中有一个CorrelationData对象,同时在RabbitTemplate中有相应的send方法:

1
2
3
4
5

@Override
public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException {

}

这个方法AmqpTemplate中是没有的,是RabbitTemplate扩展的。所以,虽然ConfirmCallback不能直接拿到Message,但可以拿到CorrelationData,于是问题就解决了。

直接在ConfirmCallback中调用RabbitTemplate发送消息导致死锁,现在我们可以通过CorrelationData在ConfirmCallback中拿到Message对象了,我们也有办法拿到RabbitTemplate,为了避免bean的循环依赖,我是这样做的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

@Bean
public RabbitTemplate amqpTemplate(ConnectionFactory amqpConnectionFactory,
RetryTemplate retryTemplate,
MessageConverter messageConverter,
//RabbitTemplate.ConfirmCallback confirmCallback,
RabbitTemplate.ReturnCallback returnCallback){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(amqpConnectionFactory);
rabbitTemplate.setRetryTemplate(retryTemplate);
rabbitTemplate.setMessageConverter(messageConverter);
//rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.setConfirmCallback(new ReissueMessageConfirmCallback(rabbitTemplate));
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}

ReissueMessageConfirmCallback是自己写的一个实现类,将RabbitTemplate bean自己设置进去。然后我们在ConfirmCallback中发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class ReissueMessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
private static final Logger logger = LoggerFactory.getLogger(ReissueMessageConfirmCallback.class);
private RabbitTemplate rabbitTemplate;
public ReissueMessageConfirmCallback(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause){
if (correlationData instanceof MessageCorrelationData) {
MessageCorrelationData messageCorrelationData = (MessageCorrelationData) correlationData;
String exchange = messageCorrelationData.getExchange();
String routingKey = messageCorrelationData.getRoutingKey();
Message message = messageCorrelationData.getMessage();
if (!ack) {
rabbitTemplate.send(exchange, routingKey, message, messageCorrelationData);
}
}
}
}

MessageCorrelationData是自己写的CorrelationData扩展类,增加了Message、exchange、routingKey属性。

在请求主线程发送1万条消息的过程中,将rabbitmq关闭,这时请求主线程和ConfirmCallback线程都在等待Connection恢复,然后重新启动rabbitmq,当程序重新建立Connection之后,这两个线程会死锁。

可行的方案:定时任务重发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Component
public class ReissueMessageSchedule implements InitializingBean {

@Autowired
private RabbitTemplate rabbitTemplate;

private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

public void start(){
scheduledExecutorService.scheduleWithFixedDelay(new ReissueTask(rabbitTemplate), 10, 10, TimeUnit.SECONDS);
}

@Override
public void afterPropertiesSet(){
this.start();
}
}

public class ReissueTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ReissueTask.class);

private RabbitTemplate rabbitTemplate;

public ReissueTask(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
}

@Override
public void run() {
List<MessageCorrelationData> messageCorrelationDataList = new ArrayList<>(SendFailedMessageHolder.getAll());
logger.info("------------------获取到" + messageCorrelationDataList.size() + "条ack=false的消息,准备重发------------------");
SendFailedMessageHolder.clear();
int i = 1;
for (MessageCorrelationData messageCorrelationData : messageCorrelationDataList) {
Message message = messageCorrelationData.getMessage();
String messageId = message.getMessageProperties().getMessageId();
logger.info("------------------重发第" + i + "条消息,id: " + messageId + "------------------");
i++;
message.getMessageProperties().setMessageId(messageId + "-重发");
rabbitTemplate.send(messageCorrelationData.getExchange(), messageCorrelationData.getRoutingKey(),
messageCorrelationData.getMessage(), messageCorrelationData);
}

logger.info("------------------重发完成------------------");
}
}

重发的消息会在原消息id后面跟上“重发”二字。

本地测试打印出的相关信息,发送端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

15:07:36.063 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:29] - ------------------获取到13条发送失败的消息,准备重发------------------

15:07:36.063 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第1条消息,id: reactor-http-nio-3-7439------------------

15:07:38.030 [pool-3-thread-1] INFO o.s.a.r.c.CachingConnectionFactory [AbstractConnectionFactory.java:455] - Attempting to connect to: [localhost:5672]

15:07:40.036 [reactor-http-nio-3] INFO o.s.a.r.c.CachingConnectionFactory [AbstractConnectionFactory.java:455] - Attempting to connect to: [localhost:5672]

...

15:08:14.188 [pool-3-thread-1] INFO o.s.a.r.c.CachingConnectionFactory [AbstractConnectionFactory.java:455] - Attempting to connect to: [localhost:5672]

15:08:16.190 [reactor-http-nio-3] INFO o.s.a.r.c.CachingConnectionFactory [AbstractConnectionFactory.java:455] - Attempting to connect to: [localhost:5672]

15:08:16.710 [reactor-http-nio-3] INFO o.s.a.r.c.CachingConnectionFactory [AbstractConnectionFactory.java:471] - Created new connection: amqpConnectionFactory#2127e66e:25/SimpleConnection@ee0d88b [delegate=amqp://guest@127.0.0.1:5672/test, localPort= 57212]

15:08:16.716 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第2条消息,id: reactor-http-nio-3-7440------------------

15:08:16.716 [reactor-http-nio-3] INFO c.l.l.r.p.c.RabbitmqController [RabbitmqController.java:102] - send message, id: reactor-http-nio-3-7452

send message: reactor-http-nio-3-7452

15:08:16.717 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第3条消息,id: reactor-http-nio-3-7441------------------

15:08:16.718 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第4条消息,id: reactor-http-nio-3-7442------------------

15:08:16.718 [reactor-http-nio-3] INFO c.l.l.r.p.c.RabbitmqController [RabbitmqController.java:102] - send message, id: reactor-http-nio-3-7453

send message: reactor-http-nio-3-7453

15:08:16.718 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第5条消息,id: reactor-http-nio-3-7443------------------

15:08:16.719 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第6条消息,id: reactor-http-nio-3-7444------------------

15:08:16.719 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第7条消息,id: reactor-http-nio-3-7445------------------

15:08:16.719 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第8条消息,id: reactor-http-nio-3-7446------------------

15:08:16.720 [reactor-http-nio-3] INFO c.l.l.r.p.c.RabbitmqController [RabbitmqController.java:102] - send message, id: reactor-http-nio-3-7454

send message: reactor-http-nio-3-7454

15:08:16.720 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第9条消息,id: reactor-http-nio-3-7447------------------

15:08:16.720 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第10条消息,id: reactor-http-nio-3-7448------------------

15:08:16.720 [AMQP Connection 127.0.0.1:5672] INFO c.l.l.r.p.r.ReissueMessageConfirmCallback [ReissueMessageConfirmCallback.java:21] - ------------messageId: reactor-http-nio-3-7451, ack: true, cause:null--------------

15:08:16.721 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第11条消息,id: reactor-http-nio-3-7449------------------

15:08:16.721 [reactor-http-nio-3] INFO c.l.l.r.p.c.RabbitmqController [RabbitmqController.java:102] - send message, id: reactor-http-nio-3-7455

send message: reactor-http-nio-3-7455

15:08:16.721 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第12条消息,id: reactor-http-nio-3-7450------------------

15:08:16.722 [reactor-http-nio-3] INFO c.l.l.r.p.c.RabbitmqController [RabbitmqController.java:102] - send message, id: reactor-http-nio-3-7456

send message: reactor-http-nio-3-7456

15:08:16.723 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第13条消息,id: reactor-http-nio-3-7451------------------

15:08:16.723 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:41] - ------------------重发完成------------------

reactor-http-nio-3是请求主线程,pool-3-thread-1是执行重发消息定时任务的线程。

从以上日志信息可以看出,当rabbitmq关闭的时候,主线程与重发线程都在尝试重连,直到rabbitmq重启完成恢复Connection。重发的消息有13条:reactor-http-nio-3-7439 ~ reactor-http-nio-3-7451。

再看消费端整理并打印出来的接收到的所有消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

--------should receive:10000---------

----------actually receive: 10013----------

----------absent messages:0---------

----------resend messages: 13----------

reactor-http-nio-3-7439-重发

reactor-http-nio-3-7440-重发

reactor-http-nio-3-7441-重发

reactor-http-nio-3-7442-重发

reactor-http-nio-3-7443-重发

reactor-http-nio-3-7444-重发

reactor-http-nio-3-7446-重发

reactor-http-nio-3-7447-重发

reactor-http-nio-3-7445-重发

reactor-http-nio-3-7449-重发

reactor-http-nio-3-7448-重发

reactor-http-nio-3-7450-重发

reactor-http-nio-3-7451-重发

可以看到,我们正确收到了上面那重发的13条消息。不过这次运气比较好,没有消息遗漏。同时,这里注意到一件事,消费端代码没有对重发的消息做排序,收到的重发消息的顺序与发送端重发消息的顺序是不匹配的,所以rabbitmq可能不保证先发出的消息一定先被接收。

下面是5个线程同时发送消息的测试结果,发送端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
15:42:40.602 [pool-3-thread-1] INFO  c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:29] - ------------------获取到642条发送失败的消息,准备重发------------------

15:42:40.602 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第1条消息,id: pool-5-thread-4-6951------------------

...

省略重连过程

...

15:43:07.628 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第2条消息,id: pool-5-thread-5-6605------------------

...

省略中间600多条消息的重发

...

15:43:07.794 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第641条消息,id: pool-5-thread-1-6704------------------

15:43:07.794 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:35] - ------------------重发第642条消息,id: pool-5-thread-4-7088------------------

15:43:07.794 [pool-3-thread-1] INFO c.l.l.r.p.schedule.task.ReissueTask [ReissueTask.java:41] - ------------------重发完成------------------

消费端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
--------should receive:50000---------

----------actually receive: 50014----------

----------absent messages:628---------

pool-5-thread-1-6583

pool-5-thread-1-6584

...

pool-5-thread-1-6705

pool-5-thread-2-6538

...

pool-5-thread-2-6653

pool-5-thread-3-6093

...

pool-5-thread-3-6218

pool-5-thread-4-6955

...

pool-5-thread-4-7087

pool-5-thread-5-6605

...

pool-5-thread-5-6733

pool-5-thread-5-6734

----------resend messages: 642----------

pool-5-thread-1-6580-重发

pool-5-thread-1-6581-重发

...

pool-5-thread-1-6705-重发

pool-5-thread-1-6706-重发

pool-5-thread-2-6537-重发

...

pool-5-thread-2-6654-重发

pool-5-thread-3-6093-重发

...

pool-5-thread-3-6219-重发

pool-5-thread-4-6951-重发

...

pool-5-thread-4-7088-重发

pool-5-thread-5-6604-重发

...

pool-5-thread-5-6734-重发

pool-5-thread-5-6735-重发

可以看到,丢失的消息被完美地包含在重发的消息里面了。

(2)长时间无法恢复的Connection中断

上面讨论了retry之后可以恢复Connection的情况,也有可能长时间retry之后依然不能恢复Connection,如rabbitmq挂掉的情况,不能一直retry下去阻塞接口调用。

这种情况是没有confirm的,因为消息都没有发出去。所以处理就更简单了:

1
2
3
4
5
try {
rabbitTemplate.send(messageCorrelationData.getExchange(), messageCorrelationData.getRoutingKey(), messageCorrelationData.getMessage(), messageCorrelationData);
} catch (AmqpConnectException e) {
SendFailedMessageHolder.add(messageCorrelationData);
}

retry失败或者没有retry机制都会抛出AmqpConnectException,catch之后将消息保存起来即可。

2.15 消费端的消息去重

如果发送端采用confirm机制来做丢失消息的重发,上面提到,可能会出现没有丢失的消息也被重发了,导致消息重复。

这个问题很容易解决,MessageProperties中是有messageId属性的,每条消息设置一个唯一的messageId即可。

1
2
Message message = messageConverter.toMessage(messageId, new MessageProperties());
message.getMessageProperties().setMessageId(messageId);

2.16 消息发送和接收使用不同的Connection

当一个服务同时作为消息发送端和接收端时,建议使用不同的Connection以避免一方出现故障影响到另一方。

并不需要做很多事情,只需RabbitTemplate配置中加一个属性设置即可:

1
rabbitTemplate.setUsePublisherConnection(true);

RabbitTemplate在创建Connection时,会根据这个boolean参数选择使用ConnectionFactory本身或者ConnectionFactory中的publisherConnectionFactory(也是一个ConnectionFactory)来创建,相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
   /**
* Create a connection with this connection factory and/or its publisher factory.
* @param connectionFactory the connection factory.
* @param publisherConnectionIfPossible true to use the publisher factory, if present.
* @return the connection.
* @since 2.0.2
*/
public static Connection createConnection(final ConnectionFactory connectionFactory, final boolean publisherConnectionIfPossible) {
if (publisherConnectionIfPossible) {
ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
if (publisherFactory != null) {
return publisherFactory.createConnection();
}
}

return connectionFactory.createConnection();
}

2.17 消息过期

在发送端,可通过如下方式设置消息过期时间:

1
message.getMessageProperties().setExpiration("30000");

这样,这条消息的有效期是30秒,30秒没有被消费掉会被丢弃。

2.18 Dead letter exchange

这个与spring-amqp无关,是rabbitmq的设置。将一个queue设置了 x-dead-letter-exchangex-dead-letter-routing-key 两个参数后,这个queue里丢弃的消息将会进入 dead letter exchange ,并route到相应的queue里去。

这里,被丢弃的消息包括:

1
2
3
4
5
The message is rejected (basic.reject or basic.nack) with requeue=false,

The TTL for the message expires; or

The queue length limit is exceeded.

三. 原理

3.1 架构

Spring AMQP的构成:

  • spring-amqp:包含 org.springframework.amqp.core软件包,提供不依赖任何特定 AMQP 代理实现或 Client 端库的通用抽象。最终用户代码将只能在抽象层上开发,因此在各个供应商的实现中将更具可移植性。
  • spring-rabbit:RabbitMQ实现。

3.2 核心类

(1)Message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Message {

private final MessageProperties messageProperties;

private final byte[] body;

public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}

public byte[] getBody() {
return this.body;
}

public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}

MessageProperties 接口定义了一些常用属性(如Header、exchange、routing key、消息创建时间timestamp等),也可以通过header扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class MessageProperties implements Serializable {
private static final long serialVersionUID = 1619000546531112290L;
public static final String CONTENT_TYPE_BYTES = "application/octet-stream";
public static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";
public static final String CONTENT_TYPE_SERIALIZED_OBJECT = "application/x-java-serialized-object";
public static final String CONTENT_TYPE_JSON = "application/json";
public static final String CONTENT_TYPE_JSON_ALT = "text/x-json";
public static final String CONTENT_TYPE_XML = "application/xml";
public static final String SPRING_BATCH_FORMAT = "springBatchFormat";
public static final String BATCH_FORMAT_LENGTH_HEADER4 = "lengthHeader4";
public static final String SPRING_AUTO_DECOMPRESS = "springAutoDecompress";
public static final String X_DELAY = "x-delay";
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE;
public static final Integer DEFAULT_PRIORITY;
private final Map<String, Object> headers = new HashMap();
private volatile Date timestamp;
private volatile String messageId;
private volatile String userId;
private volatile String appId;
private volatile String clusterId;
private volatile String type;
private volatile byte[] correlationId;
private volatile String correlationIdString;
private volatile String replyTo;
private volatile String contentType = "application/octet-stream";
private volatile String contentEncoding;
private volatile long contentLength;
private volatile boolean contentLengthSet;
private volatile MessageDeliveryMode deliveryMode;
private volatile String expiration;
private volatile Integer priority;
private volatile Boolean redelivered;
private volatile String receivedExchange;
private volatile String receivedRoutingKey;
private volatile String receivedUserId;
private volatile long deliveryTag;
private volatile boolean deliveryTagSet;
private volatile Integer messageCount;
private volatile String consumerTag;
private volatile String consumerQueue;
private volatile Integer receivedDelay;
private volatile MessageDeliveryMode receivedDeliveryMode;
private transient volatile Type inferredArgumentType;
private transient volatile Method targetMethod;
private transient volatile Object targetBean;

// 扩展属性
public void setHeader(String key, Object value) {
this.headers.put(key, value);
}

...
}

(2)Exchange

Exchange 接口表示一个 AMQP 交换器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Exchange {

// 名称
String getName();

// 交换器类型
String getExchangeType();

// 是否持久化
boolean isDurable();

// 是否自动删除
boolean isAutoDelete();

// 其他一些结构化参数
Map<String, Object> getArguments();

}

Exchange:DirectTopicFanoutHeaders 。库中可以找到每种类型的Exchange 接口的实现。不同交换器类型可以参考:RabbitMQ(一)简介和入门

(3)Queue

Queue 类表示消息队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Queue  {

// 队列名称
private final String name;

// 是否持久化
private volatile boolean durable;

// 是否排他,仅对首次声明它的连接可见,并在连接断开时自动删除,适用于一个客户端同时发送和读取消息的应用场景。
private volatile boolean exclusive;

// 是否自动删除
private volatile boolean autoDelete;

// 其他一些参数
private volatile Map<String, Object> arguments;

/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}

// Getters and Setters omitted for brevity

}

构造函数采用队列名称。取决于实现方式,Management 模板可以提供用于生成唯一命名的队列的方法。这样的队列可用作“答复”地址或其他“临时”情况。因此,自动生成的 Queue 的 exclusive 和 autoDelete 属性都将设置为 true 。

(4)Binding

生产者发送到 Exchange,而消费者从队列接收,将队列连接到 Exchange 的绑定至关重要。Spring AMQP 中定义了一个 Binding 类来表示这些连接。

可以使用固定的路由键将队列绑定到 DirectExchange。

1
new Binding(someQueue, someDirectExchange, "foo.bar")

可以使用路由模式将队列绑定到 TopicExchange。

1
new Binding(someQueue, someTopicExchange, "foo.*")

可以使用路由键将队列绑定到 FanoutExchange。

1
new Binding(someQueue, someFanoutExchange)

还提供 BindingBuilder 以促进“Fluent 的 API”样式。

1
2
// 对bind()方法使用静态导入样式效果很好
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

AmqpAdmin 类可以使用绑定实例来实际触发代理上的绑定操作,可以在@Configuration 类中使用 Spring 的 @Bean -style 定义 Binding 实例。

3.3 连接和资源Management

这一部分区别于核心类是基于具体实现的,RabbitMQ是目前唯一支持的实现。

(1)ConnectionFactory-连接工厂类

  • ConnectionFactory 接口是用于 Management 与 RabbitMQ 代理的连接的中央组件。
  • ConnectionFactory 的职责是提供org.springframework.amqp.rabbit.connection.Connection 的实例,该实例是 com.rabbitmq.client.Connection 的包装器。
  • 提供的唯一具体实现是 CachingConnectionFactory ,默认情况下,它构建一个可以由应用程序共享的单个连接代理。
  • 可以共享连接,因为与 AMQP 进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中的 Connection 和 Session 之间的关系)。可以想象,连接实例提供了 createChannel 方法。
  • CachingConnectionFactory 实现支持这些通道的缓存,并且根据它们是否是事务性的,为通道维护单独的缓存。创建 CachingConnectionFactory 的实例时,可以通过构造函数提供hostname。还应该提供username和password属性。如果要配置通道缓存的大小(默认值为25),则也可以在此处调用 setChannelCacheSize() 方法。
  • 从1.3版本开始,可以将 CachingConnectionFactory 配置为缓存连接以及通道。在这种情况下,每次对 createConnection() 的调用都会创建一个新连接(或从缓存中检索一个空闲的连接)。关闭连接会将其返回到缓存(如果尚未达到缓存大小)。在此类连接上创建的通道也将被缓存。
  • 在某些环境中,使用单独的连接可能很有用,例如从 HA 群集中使用负载,并与负载均衡器一起连接到不同的群集成员。将 cacheMode 设置为CacheMode.CONNECTION
  • 从1.5.5版本开始,提供了一个新属性 connectionLimit 。设置此选项后,它将限制允许的连接总数。设置后,如果达到限制,则使用 channelCheckoutTimeLimit await 连接变为空闲。如果超过时间,则抛出 AmqpTimeoutException
  • 当使用大量连接时,应考虑在 CachingConnectionFactory 上设置自定义executor 。然后,所有连接将使用同一执行程序,并且可以共享其线程。执行程序的线程池应该是无界的,或者应为预期的使用率进行适当设置(通常每个连接至少一个线程)。如果在每个连接上创建多个通道,则池大小将影响并发性,因此,变量(或简单缓存)线程池执行程序将是最合适的。
  • 从1.6版本开始,默认的通道缓存大小已从1增加到25。在大容量,多线程的环境中,小的缓存意味着将以较高的速率创建和关闭通道。增加默认缓存大小将避免这种开销。您应该通过 RabbitMQ Admin UI 监视正在使用的通道,如果看到许多正在创建和关闭的通道,请考虑进一步增加缓存大小。缓存将仅按需增长(以适应应用程序的并发要求),因此此更改不会影响现有的小批量应用程序。
  • 从1.4.2版本开始,CachingConnectionFactory 具有属性channelCheckoutTimeout 。当此属性大于零时,channelCacheSize 成为可在连接上创建的通道数的限制。如果达到限制,则调用线程将阻塞,直到某个通道可用或达到此超时为止,在这种情况下将引发 AmqpTimeoutException
  • 框架内使用的通道(例如RabbitTemplate)将可靠地返回到缓存。如果您在框架外部创建通道(例如,通过直接访问连接并调用 createChannel() ),则必须(通过关闭)可靠地返回它们(可能在 finally 块中),以避免耗尽通道。
1
2
3
4
5
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

XML :

1
2
3
4
5
6
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>

可以使用 Rabbit 名称空间快速便捷地创建ConnectionFactory

1
<rabbit:connection-factory id="connectionFactory"/>

在大多数情况下,这是可取的,因为框架可以为您选择最佳的默认值。创建的实例将是 CachingConnectionFactory 。请记住,通道的默认缓存大小为 25。如果要缓存更多通道,请通过channelCacheSize属性设置一个较大的值。在 XML 中,它看起来像这样:

1
2
3
4
5
6
7
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>

(2)ConnectionNameStrategy-连接命名

从1.7 版本开始,提供了 ConnectionNameStrategy 以便注入AbstractionConnectionFactory 。生成的名称用于目标 RabbitMQ 连接的特定于应用程序的标识。如果 RabbitMQ 服务器支持,则连接名称将显示在 ManagementUI 中。此值不必是唯一的,也不能用作连接标识符,例如在 HTTP API 请求中。该值应该是人类可读的,并且是connection_name键下ClientProperties的一部分。

可以使用一个简单的 Lambda:

1
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 参数可用于通过某些逻辑来区分目标连接名称。默认情况下,AbstractConnectionFactorybeanName ,代表对象的十六进制字符串和内部计数器用于生成 connection_name<rabbit:connection-factory> 名称空间组件也随 connection-name-strategy 属性一起提供。

提供了实现 SimplePropertyValueConnectionNameStrategy ,该实现将连接名称设置为应用程序属性。将其声明为 @Bean 并将其注入连接工厂:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public ConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}

使用 Spring Boot 及其自动配置的连接工厂时,仅需要声明 ConnectionNameStrategy@Bean。引导程序将自动检测到该 bean,并将其连接到工厂。

(3)阻止的连接和资源限制

RabbitMQ内存警告(Memory Alarm)连接可能被阻止与 Broker 进行交互。从2.0开始,提供 com.rabbitmq.client.BlockedListener ,以通知连接被阻止和未被阻止的事件。此外,AbstractConnectionFactory 通过其内部BlockedListener 实现分别发出 ConnectionBlockedEventConnectionUnblockedEvent 。这些使您能够提供应用程序逻辑,以对代理程序上的问题做出适当反应,并采取一些纠正措施。

当应用程序配置有单个 CachingConnectionFactory 时(默认情况下使用 Spring Boot 自动配置),当代理阻止连接时,应用程序将停止工作。当它被 broker 阻止时,它的任何 Client 都会停止工作。如果我们在同一应用程序中具有生产者和消费者,那么当生产者阻塞连接时,由于代理上不再有资源,并且由于连接被阻塞,消费者也无法释放它们,我们可能最终陷入阻塞。

为了缓解该问题,建议再使用一个具有相同选项的单独的 CachingConnectionFactory 实例:一个用于生产者,一个用于消费者。对于在消费者线程上执行的事务生产者,不可能使用单独的 CachingConnectionFactory ,因为他们应该重用与消费者事务相关联的Channel

从版本2.0.2开始,RabbitTemplate 具有配置选项,以自动使用第二个连接工厂,除非正在使用事务。发布者连接的 ConnectionNameStrategy 与主要策略相同,在调用方法的结果后附加 .publisher

从1.7.7开始,提供了一个 AmqpResourceNotAvailableException ,例如当SimpleConnection.createChannel() 无法创建Channel时抛出该AmqpResourceNotAvailableException ,因为达到了 channelMax 的限制并且缓存中没有可用的通道。可以在 RetryPolicy 中使用此异常,以在某些回退之后恢复操作。

(4)配置基础 Client 端连接工厂

CachingConnectionFactory 使用 RabbitClient 端 ConnectionFactory 的实例;在 CachingConnectionFactory 上设置等效属性时,会传递许多配置属性(例如host, port, userName, password, requestedHeartBeat, connectionTimeout)。要设置其他属性(例如clientProperties),请定义 Rabbit 工厂的实例,并使用CachingConnectionFactory的适当构造函数为其提供引用。如上所述使用命名空间时,请在connection-factory属性中提供对已配置工厂的引用。为方便起见,提供了工厂 bean 来帮助在 Spring 应用程序上下文中配置连接工厂,如下一节所述。

1
2
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
  • 4.0.x客户端默认启用了自动恢复功能;虽然与此功能兼容,但Spring AMQP有自己的恢复机制,一般不需要客户端的恢复功能。
  • 建议禁用amqp-client自动恢复功能,以避免在代理可用但连接尚未恢复的情况下得到AutoRecoverConnectionNotCurrentlyOpenException。您可能会注意到这种异常,例如,当RabbitTemplate中配置了RetryTemplate时,甚至在故障转移到集群中的另一个代理时也是如此。
  • 由于自动恢复连接是在一个计时器上恢复的,因此使用 Spring AMQP 的恢复机制可以更快地恢复连接。从 1.7.1 版开始,Spring AMQP 禁用它,除非您明确创建自己的 RabbitMQ 连接工厂并将其提供给 CachingConnectionFactory。由 RabbitConnectionFactoryBean 创建的 RabbitMQ ConnectionFactory 实例也将默认禁用该选项。

(5)RabbitConnectionFactoryBean 和配置 SSL

从1.4开始,提供了一个方便的RabbitConnectionFactoryBean,以使用依赖注入的方式在Client端连接工厂上配置 SSL 属性。

1
2
3
4
5
6
7
8
9
10
11
12
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
</bean>

省略keyStoretrustStore配置以通过 SSL 进行连接而无需证书验证。密钥和信任库配置可以如下提供:

sslPropertiesLocation属性是 Spring Resource,它指向包含以下键的属性文件:

1
2
3
4
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStoretruststore是指向 Store 的 Spring Resources。通常,此属性文件将由 os 保护,并且应用程序具有读取访问权限。

从 Spring AMQP 版本 1.5 开始,可以直接在工厂 bean 上设置这些属性。如果同时提供了离散属性和sslPropertiesLocation,则后者中的属性将覆盖离散值。

(6)路由连接工厂

从1.3开始,引入了AbstractRoutingConnectionFactory。这提供了一种机制,可在运行时为多个ConnectionFactories配置 Map 并由某个lookupKey确定目标ConnectionFactory

Spring AMQP 提供了SimpleRoutingConnectionFactory,它从SimpleResourceHolder获取当前线程绑定的lookupKey

1
2
3
4
5
6
7
8
9
10
11
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyService {

@Autowired
private RabbitTemplate rabbitTemplate;

public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
// 使用后解除绑定资源很重要
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}

}

从版本 1.4 开始,RabbitTemplate支持 SpEL sendConnectionFactorySelectorExpressionreceiveConnectionFactorySelectorExpression 属性,这些属性在每个 AMQP 协议交互操作(sendsendAndReceivereceivereceiveAndReply)上进行评估,对于提供的AbstractRoutingConnectionFactory 解析为 lookupKey 值。表达式中可以使用 Bean 引用,例如"@vHostResolver.getVHost(#root)"。对于send操作,要发送的消息是根评估对象。对于receive操作,queueName 是根评估对象。

路由算法为:

  • 如果 selectors 表达式为null,或者计算为null,或者提供的ConnectionFactory不是AbstractRoutingConnectionFactory的实例,则所有操作都像以前一样,取决于提供的ConnectionFactory实现。
  • 如果评估结果不是null,但是没有针对该lookupKey的目标ConnectionFactory,并且AbstractRoutingConnectionFactory配置为lenientFallback = true,则会发生相同的情况。
  • 当然,在AbstractRoutingConnectionFactory的情况下,它会回退到基于determineCurrentLookupKey()routing实现。但是,如果lenientFallback = false,则抛出IllegalStateException

命名空间支持还在<rabbit:template>组件上提供了send-connection-factory-selector-expressionreceive-connection-factory-selector-expression属性。

同样从1.4开始,可以在侦听器容器中配置路由连接工厂。在这种情况下,队列名称列表将用作查找关键字。例如,如果您使用setQueueNames("foo", "bar")配置容器,则查找键将为"[foo,bar]"(无空格)。

从版本 1.6.9 开始,可以使用侦听器容器上的setLookupKeyQualifier向查找键添加限定符。

例如,这将允许侦听具有相同名称但在不同虚拟主机中的队列(每个虚拟主机中都有一个连接工厂)。

例如,在使用查找键限定符foo和侦听队列bar的容器的情况下,用于注册目标连接工厂的查找键将是foo[bar]

(7)队列相似性和LocalizedQueueConnectionFactory

在集群中使用HA队列时,为了获得最佳性能,可能需要连接到主队列所在的物理代理。 CachingConnectionFactory可以配置多个代理地址;这是为了进行故障转移,Client 端将尝试按 Sequences 连接。

LocalizedQueueConnectionFactory使用 Management 插件提供的 REST API 来确定要控制队列的节点。然后,它创建(或从缓存中检索)CachingConnectionFactory,该CachingConnectionFactory将仅连接到该节点。如果连接失败,那么将确定新的主节点,并且使用者将连接到该主节点。

LocalizedQueueConnectionFactory配置有默认的连接工厂,以防无法确定队列的物理位置,在这种情况下,它将正常连接到群集。

LocalizedQueueConnectionFactoryRoutingConnectionFactory,而SimpleMessageListenerContainer使用队列名称作为查找关键字。

注意

  • 由于使用队列名称进行查找这个原因,只有在将容器配置为侦听单个队列时,才能使用LocalizedQueueConnectionFactory
  • 必须在每个节点上启用 RabbitMQManagement 插件。
  • 此连接工厂用于长期连接,例如SimpleMessageListenerContainer使用的连接。它不适用于短连接,例如用于RabbitTemplate,因为在构建连接之前调用 REST API 会产生开销。同样,对于发布操作,队列是未知的,并且无论如何该消息都会发布给所有集群成员,因此查找节点的逻辑几乎没有价值。

这是一个示例配置,使用 Spring Boot 的 RabbitProperties 配置工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Autowired
private RabbitProperties props;

private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };

private final String[] nodes = { "[emailprotected]", "[emailprotected]" };

@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}

@Bean
public ConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris, this.nodes,
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}

请注意,前三个参数是addressesadminUrisnodes的数组。这些是适当的,因为当容器尝试连接到队列时,它确定队列在哪个节点上被控制,并连接到同一阵列位置中的地址。

(8)发布者确认并return

通过将CachingConnectionFactorypublisherConfirmspublisherReturns属性分别设置为’true’,可以支持确认和返回的消息。

设置这些选项后,工厂创建的Channel将被包装在PublisherCallbackChannel中,方便回调。当获得这样的 Channels 时,Client 端可以向Channel注册PublisherCallbackChannel.ListenerPublisherCallbackChannel实现包含将确认/返回路由到适当的侦听器的逻辑。

(9)连接和 Channels 监听器

连接工厂支持注册ConnectionListenerChannelListener实现。这使您可以接收有关连接和通道相关事件的通知。 (构建连接时,RabbitAdmin使用ConnectionListener来执行声明。

1
2
3
4
5
6
7
8
9
10
11
12
@FunctionalInterface
public interface ConnectionListener {

void onCreate(Connection connection);

default void onClose(Connection connection) {
}

default void onShutDown(ShutdownSignalException signal) {
}

}
1
2
3
4
5
6
7
8
9
@FunctionalInterface
public interface ChannelListener {

void onCreate(Channel channel, boolean transactional);

default void onShutDown(ShutdownSignalException signal) {
}

}

(10)记录 Channels 关闭事件

在 1.5 版中引入了一种使用户能够控制日志记录级别的机制。

CachingConnectionFactory使用默认策略记录通道关闭,如下所示:

  • 正常通道关闭(200 OK)不会被记录。
  • 如果通道由于被动队列声明失败而关闭,那么它将在调试级别记录。
  • 如果通道由于basic.consume由于特殊的使用者条件而被拒绝而关闭,则它将以 INFO 级别记录。
  • 其他所有日志均以 ERROR 级别记录。

若要修改此行为,请在其closeExceptionLogger属性中将自定义ConditionalExceptionLogger注入到CachingConnectionFactory中。

(11)运行时缓存属性

从1.6开始,CachingConnectionFactory 通过getCacheProperties()方法提供了缓存统计信息。这些统计信息可用于调整缓存以在 Producing 对其进行优化。例如,高水位标记可用于确定是否应增加缓存大小。如果等于缓存大小,则可能要考虑进一步增加。

表 3.1. CacheMode.CHANNEL 的缓存属性

Property Meaning
connectionName ConnectionNameStrategy生成的连接的名称。
channelCacheSize 当前配置的允许空闲的最大通道数。
localPort 连接的本地端口(如果有)。这可用于与 RabbitMQ Admin UI 上的连接/通道关联。
idleChannelsTx 当前空闲(缓存)的事务通道的数量。
idleChannelsNotTx 当前空闲(缓存)的非事务通道的数量。
idleChannelsTxHighWater 已同时空闲(缓存)的最大事务通道数。
idleChannelsNotTxHighWater 非事务通道的最大数量已被同时空闲(缓存)。

表 3.2. CacheMode.CONNECTION 的缓存属性

Property Meaning
connectionName:<localPort> ConnectionNameStrategy生成的连接的名称。
openConnections 表示与代理的连接的连接对象的数量。
channelCacheSize 当前配置的允许空闲的最大通道数。
connectionCacheSize 当前配置的允许空闲的最大连接数。
idleConnections 当前空闲的连接数。
idleConnectionsHighWater 并发空闲的最大连接数。
idleChannelsTx:<localPort> 该连接当前空闲(缓存)的事务通道的数量。属性名称的 localPort 部分可用于与 RabbitMQ Admin UI 上的连接/通道关联。
idleChannelsNotTx:<localPort> 该连接当前空闲(缓存)的非事务通道的数量。属性名称的 localPort 部分可用于与 RabbitMQ Admin UI 上的连接/通道关联。
idleChannelsTxHighWater:
<localPort> 同时空闲(缓存)的最大事务通道数。属性名称的 localPort 部分可用于与 RabbitMQ Admin UI 上的连接/通道关联。
idleChannelsNotTxHighWater: <localPort> 已同时空闲(缓存)的非事务通道的最大数量。属性名称的 localPort 部分可用于与 RabbitMQ Admin UI 上的连接/通道关联。

cacheMode属性(还包括CHANNELCONNECTION)。

图 3.1. JVisualVM 示例

cacheStats

(12)RabbitMQ 自动连接/拓扑恢复

从 Spring AMQP 的第一个版本开始,该框架在代理发生故障的情况下提供了自己的连接和通道恢复。另外,RabbitAdmin 将在重新构建连接时重新声明任何基础结构Bean(队列等)。因此,它不依赖amqp-client库现在提供的Auto Recovery

Spring AMQP 现在使用amqp-client4.0.x版本,默认情况下启用了自动恢复。如果愿意,Spring AMQP 仍可以使用其自己的恢复机制,在 Client 端中将其禁用(通过将基础RabbitMQ connectionFactory设置为falseautomaticRecoveryEnabled属性)。

但是,该框架与启用的自动恢复完全兼容。这意味着您在代码中创建的所有使用者(可能通过RabbitTemplate.execute())都可以自动恢复。

3.4 添加自定义 Client 端连接属性

CachingConnectionFactory 允许访问基础连接工厂,以允许例如设置自定义 Client 端属性:

1
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");

查看连接时,这些属性会显示在 RabbitMQ Management UI 中。

3.5 AmqpTemplate

AmqpTemplate接口定义了用于发送和接收消息的所有基本操作。

未完待续。

(1)添加重试功能

(2)发布是异步的-如何检测成功和失败

(3)发布者确认并return

(4)Scoped Operations

(5)Messaging integration

(6)已验证的用户ID

(7)使用单独的连接

3.6 发送消息

(1)Message Builder

(2)Publisher Returns

(3)Batching

3.7 接收消息

(1)Polling Consumer

(2)Asynchronous Consumer

(3)Message Listener

(4)MessageListenerAdapter

(5)Container

(6)Consumer Priority

(7)auto-delete Queues

(8)Batched Messages

(9)Consumer Events

(10)Consumer Tags

(11)注解驱动的监听器端点

四. 源码剖析

4.1 RabbitMQ消费流程

(1)启动流程

  1. 通过BeanPostProcessor扫描所有的Bean中存在的 @RabbitListener 注解及相应的Method;

  2. RabbitListenerContainerFactory根据配置为每一个 @RabbitListener 注解创建一个MessageListenerContainer,持有 @RabbitListener 注解及Method信息;

  3. 初始化MessageListenerContainer,主要是循环依次创建Consumer(AsyncMessageProcessingConsumer),并启动Consumer;

    创建Consumer,过程包括:

    • 创建AMQConnection,仅第一次创建。
    • 创建AMQChannel,每个Consumer都会创建。
    • 发送消费queue的请求(basic.consume),接收并处理消息。
  4. AMQConnection持有连接到Rabbitmq Server的Socket,创建完成后启动MainLoop循环从Socket流中读取Frame,此时流中没有消息,因为Channel还没创建完成;

  5. 创建AMQChannel(一个AMQConnection中持有多个AMQChannel),并将创建完成的Channel注册到AMQConnection持有的ConsumerWorkService,实际就是添加到WorkPool的Map里面去,此时Socket流中也没有消息,因为Channel还没有与Queue绑定;
    创建完成的AMQChannel的代理返回给Consumer,Consumer通过Channel发送消费Queue的请求到Rabbitmq Server(绑定成功),此时还没开始处理消息,但Socket流中已经有消息,并且已经被Connection读取到内存(即 BlockingQueue<Runnable> )中,并且已经开始向 BlockingQueue<Delivery> 分发;

  6. Consumer启动循环,从 BlockingQueue<Delivery> 中取消息,利用MessageListenerContainer中持有的Method反射调用 @RabbitListener 注解方法处理消息。

(2)消费流程

  1. Rabbitmq Server往Socket流中写入字节。
  2. AMQConnection启动一个main loop thread来跑MainLoop,不断从Socket流中读取字节转换成Frame对象,这是每个connection唯一的数据来源。
  3. Consumer启动后,Connection读取到Frame。从basic.deliver开始是消息的内容,每条消息分成三个Frame:
    • 第一个是method,basic.deliver代表这是一个消息,后面一定会再跟着两个Frame;
    • 第二个是message header;
    • 第三个是message body,body读取之后将三个Frame整合到一起转换成一条完整的deliver命令。
  4. AMQConnection根据读取到的Frame中的type决定要怎么处理这个Frame( heartbeat(8) do nothing )其它的根据channel编号交给相应的AMQChannel去处理,(编号为0的是特殊的channel,消息相关的用的都是编号非0的channel),消息都会拿着这个编号到ChannelManager找对应的ChannelN处理。
  5. ChannelN经过一系列中间过程由Frame(消息是三个Frame)得到了Runnable,将 <ChannelN, Runnable> put 到 ConsumerWorkService 持有的 WorkPool 里面的一个 Map<Channel, BlockingQueue<Runnable>> 里面去。这样这个Runnable就进入了与ChannelN对应的 BlockingQueue<Runnable>(写死的size=1000)里面了。
  6. execute一个WorkPoolRunnable,执行的任务是:
    • 从WorkPool中找出一个ready状态的ChannelN,把这个ChannelN设为inProgress状态;
    • 从对应的 BlockingQueue<Runnable> 中取最多16个Runnable(写死)在WorkPoolRunnable的线程里依次执行(注意:此处不再另开线程,所以可能会堵塞当前线程,导致这个ChannelN长时间处于inProgress状态),执行完后将当前ChannelN状态改为ready,并在当前线程execute另一个WorkPoolRunnable。
  7. BlockingQueue<Runnable> 里面的Runnable执行的逻辑是:构造一个Delivery并put到与ChannelN对应的AsyncMessageProcessingConsumer持有的 BlockingQueue<Delivery>(size=prefetchCount可配置)里面去(如果消息处理速度太慢,BlockingQueue<Delivery> 已满,此处会堵塞)。
  8. 每个AsyncMessageProcessingConsumer都有一个独立的线程在循环从 BlockingQueue<Delivery> 一次读取一个Delivery转换成Message反射调用 @RabbitListener 注解方法来处理。

Frame对象结构如下:

  • type:指定当前Frame的类型,如 method(1)message header(2)message body(3)heartbeat(8) 等;
  • channel:channel的编号,从0~n排列,指定当前Frame需要交给哪个channel处理。channel-0为一类,channel-n为一类。
    • channel-0是一个匿名类,用来处理特殊Frame,如connection.start。
    • channel-n都是ChannelN类,由ChannelManager类统一管理。
  • payload:当前Frame的具体内容。

(3)ack消费模式

根据以上对消费过程的分析,将无ack模式与ack模式进行对比。

  • 无ack模式(AcknowledgeMode.NONE)

    • Server端行为:
      • Rabbitmq Server默认推送的所有消息都已经消费成功,会不断地向消费端推送消息。
      • 因为Rabbitmq Server认为推送的消息已被成功消费,所以推送出去的消息不会暂存在server端。
    • 消息丢失的风险:
      • BlockingQueue<Runnable> 堆满时( BlockingQueue<Delivery> 一定会先满),server端推送消息会失败,然后断开Connection。消费端从Socket读取Frame将会抛出SocketException,触发异常处理,shutdown掉Connection和所有的Channel,Channel shutdown后WorkPool中的Channel信息(包括Channel inProgress、channel ready以及Map)全部清空,所以 BlockingQueue<Runnable> 中的数据会全部丢失。
      • 此外,服务重启时也需对内存中未处理完的消息做必要的处理,以免丢失。而在Rabbitmq Server,Connection断掉后就没有消费者去消费这个queue,因此在server端会看到消息堆积的现象。
  • 有ack模式(AcknowledgeMode.AUTO,AcknowledgeMode.MANUAL):AcknowledgeMode.MANUAL模式需要人为地获取到Channel之后调用方法向Server发送ack(或消费失败时的nack)信息。AcknowledgeMode.AUTO模式下,由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。

    • server端行为:
      • Rabbitmq Server推送给每个Channel的消息数量有限制,会保证每个Channel没有收到ack的消息数量不会超过prefetchCount。
      • Server端会暂存没有收到ack的消息,等消费端ack后才会丢掉;
      • 如果收到消费端的nack(消费失败的标识)或Connection断开没收到反馈,会将消息放回到原队列头部。
    • 性能:这种模式不会丢消息,但效率较低,因为server端需要等收到消费端的答复之后才会继续推送消息,当然,推送消息和等待答复是异步的,可适当增大prefetchCount提高效率。

注意,有ack的模式下,需要考虑 setDefaultRequeueRejected(false) ,否则当消费消息抛出异常没有catch住时,这条消息会被rabbitmq放回到queue头部,再被推送过来,然后再抛异常再放回…死循环了。设置false的作用是抛异常时不放回,而是直接丢弃,所以可能需要对这条消息做处理,以免丢失。

对比:

  • 无ack模式:效率高,存在丢失大量消息的风险。
  • 有ack模式:效率低,不会丢消息。

4.2 RabbitTemplate

(1)接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
   @Override
public Message receive(String queueName) {
// receiveTimeOut参数为0,直接获取消息,不等待,获取不到返回null;否则会等待一段时间。
return this.receiveTimeout == 0L ? this.doReceiveNoWait(queueName) : this.receive(queueName, this.receiveTimeout);
}

@Override
public Message receive(final String queueName, final long timeoutMillis) {
// 通过execute传入Lambda表达式执行
Message message = execute(channel -> {
// 初始化Delivery
Delivery delivery = consumeDelivery(channel, queueName, timeoutMillis);
if (delivery == null) {
return null;
}
else {
if (isChannelLocallyTransacted(channel)) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
channel.txCommit();
}
else if (isChannelTransacted()) {
ConnectionFactoryUtils.registerDeliveryTag(getConnectionFactory(), channel,
delivery.getEnvelope().getDeliveryTag());
}
else {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
return buildMessageFromDelivery(delivery);
}
});
logReceived(message);
return message;
}

Message是通过调用execute方法得到的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

@Override
public <T> T execute(ChannelCallback<T> action) {
return execute(action, getConnectionFactory());
}

@SuppressWarnings("unchecked")
private <T> T execute(final ChannelCallback<T> action, final ConnectionFactory connectionFactory) {
// 若启用RetryTemplate重试机制
if (this.retryTemplate != null) {
try {
return this.retryTemplate.execute(
(RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
(RecoveryCallback<T>) this.recoveryCallback);
}
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
}
// 否则执行doExecute
else {
return doExecute(action, connectionFactory);
}
}

// 创建了Connection和Channel,执行action.doInRabbit()方法得到Message,关闭Channel和Connection。当然,这里Connection和Channel的创建和关闭都不一定是真的创建和关闭,与具体的实现有关,比如CachingConnectionFactory,它的实现就是有缓存的
private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) {

...
if (channel == null) {
// 初始化channel和connection
if (isChannelTransacted()) {
resourceHolder = ConnectionFactoryUtils.
getTransactionalResourceHolder(connectionFactory, true, this.usePublisherConnection);
channel = resourceHolder.getChannel();
if (channel == null) {
ConnectionFactoryUtils.releaseResources(resourceHolder);
throw new IllegalStateException("Resource holder returned a null channel");
}
}
else {
connection = ConnectionFactoryUtils.createConnection(connectionFactory,
this.usePublisherConnection); // NOSONAR - RabbitUtils closes
if (connection == null) {
throw new IllegalStateException("Connection factory returned a null connection");
}
try {
channel = connection.createChannel(false);
if (channel == null) {
throw new IllegalStateException("Connection returned a null channel");
}
}
catch (RuntimeException e) {
RabbitUtils.closeConnection(connection);
throw e;
}
}
}

...

try {
...
// action.doInRabbit()方法的实现逻辑就要再回到上面的receive方法,这里的action就是在那个receive方法传入的一个ChannelCallback的匿名内部实现类。
return action.doInRabbit(channel);
}
catch (Exception ex) {
...
}
finally {
if (!invokeScope) {
if (resourceHolder != null) {
ConnectionFactoryUtils.releaseResources(resourceHolder);
}
else {
RabbitUtils.closeChannel(channel);
RabbitUtils.closeConnection(connection);
}
}
}
}

可以看到最后返回的消息是从Delivery中得到的,那么看下Delivery是怎么来的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

private Delivery consumeDelivery(Channel channel, String queueName, long timeoutMillis) throws Exception {
Delivery delivery = null;
RuntimeException exception = null;
// 异步执行
CompletableFuture<Delivery> future = new CompletableFuture<>();
DefaultConsumer consumer = createConsumer(queueName, channel, future,
timeoutMillis < 0 ? DEFAULT_CONSUME_TIMEOUT : timeoutMillis);
try {
// 阻塞式的等待返回结果,receive方法中传入的receiveTimeout参数也正是在这里用到的
if (timeoutMillis < 0) {
delivery = future.get();
}
else {
delivery = future.get(timeoutMillis, TimeUnit.MILLISECONDS);
}
}
catch (ExecutionException e) {
Throwable cause = e.getCause();
this.logger.error("Consumer failed to receive message: " + consumer, cause);
exception = RabbitExceptionTranslator.convertRabbitAccessException(cause);
throw exception;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (TimeoutException e) {
// no result in time
}
finally {
if (exception == null || !(exception instanceof ConsumerCancelledException)) {
cancelConsumerQuietly(channel, consumer);
}
}
return delivery;
}

// 异步注册消费者
private DefaultConsumer createConsumer(final String queueName, Channel channel,
CompletableFuture<Delivery> future, long timeoutMillis) throws Exception {
// 之前初始化的channel,控制流量为1
channel.basicQos(1);
// 等待其它线程完成
final CountDownLatch latch = new CountDownLatch(1);
// 初始化消费者
DefaultConsumer consumer = new TemplateConsumer(channel) {

@Override
public void handleCancel(String consumerTag) throws IOException {
future.completeExceptionally(new ConsumerCancelledException());
}

// 执行完basicConsume,消费者注册成功会回调该函数
@Override
public void handleConsumeOk(String consumerTag) {
super.handleConsumeOk(consumerTag);
// latch-1
latch.countDown();
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
future.complete(new Delivery(consumerTag, envelope, properties, body));
}

};
// 推模式消费消息
channel.basicConsume(queueName, consumer);
// 如果消费者注册超时
if (!latch.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
// Cache模式关闭代理信道
if (channel instanceof ChannelProxy) {
((ChannelProxy) channel).getTargetChannel().close();
}
// 将过程中的异常抛出
future.completeExceptionally(
new ConsumeOkNotReceivedException("Blocking receive, consumer failed to consume: " + consumer));
}
return consumer;
}

4.3 @RabbitListener注解

通过注解的方式方便地接收消息:

1
2
3
4
5

@RabbitListener(queues = "test_queue_delay")
public void listen(Message message){
...
}

进入@RabbitListener注解源码,有一段注释说明了这个注解是怎么被处理的,通过注册一个RabbitListenerAnnotationBeanPostProcessor:

Processing of {@code @RabbitListener} annotations is performed by registering a
{@link RabbitListenerAnnotationBeanPostProcessor}. This can be done manually or, more
conveniently, through the {@code rabbit:annotation-driven/} element or
{@link EnableRabbit} annotation.

找到RabbitListenerAnnotationBeanPostProcessor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
// 当bean初始化完成后,在这里会获取到这个bean的类用户自己定义的所有添加了@RabbitListener注解的方法
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
// 然后调用processAmqpListener()方法对这些方法进行处理,实际上是对方法上的@RabbitListener进行处理,一个方法上可以有多个@RabbitListener,会处理多次
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
// 类级别注释处理
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}

获取 @RabbitListener 注解方法的具体过程看 buildMetadata() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

//在这个方法里面,找出了所有加了@RabbitListener注解的方法
//可以在类上加@RabbitListener注解,然后在方法上加@RabbitHandler注解,如果采用这种方式会processMultiMethodListeners()方法来处理这些方法。
private TypeMetadata buildMetadata(Class<?> targetClass) {
// 先找到类级别注释
Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<ListenerMethod> methods = new ArrayList<>();
final List<Method> multiMethods = new ArrayList<>();
ReflectionUtils.doWithMethods(targetClass, method -> {
// 找到方法级别注释,记录对应方法
Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
if (listenerAnnotations.size() > 0) {
methods.add(new ListenerMethod(method,
listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
}
// 类级别注释找到对应方法
if (hasClassLevelListeners) {
RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
if (rabbitHandler != null) {
multiMethods.add(method);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
if (methods.isEmpty() && multiMethods.isEmpty()) {
return TypeMetadata.EMPTY;
}
return new TypeMetadata(
methods.toArray(new ListenerMethod[methods.size()]),
multiMethods.toArray(new Method[multiMethods.size()]),
classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}

这里我们只看processAmqpListener()方法,看它是怎么处理上面找到的这些方法的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
// 将每一个加@RabbitListener注解的方法构造一个MethodRabbitListenerEndpoint,然后调用processListener()
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName) {

// 省略的部分是读取@RabbitListener注解中的值,设置到endpoint中去
...

// endpoint的属性都设置完了之后,获取我们配置的RabbitListenerContainerFactory bean,然后调用RabbitListenerEndpointRegistrar类的registerEndpoint()方法
RabbitListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
containerFactoryBeanName + "' was found in the application context", ex);
}
}

this.registrar.registerEndpoint(endpoint, factory);
}

public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
// 根据startImmediately看是否需要立刻注册endpoint,或者先将其添加到一个List,稍后统一注册
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}

对于统一注册的实现,RabbitListenerAnnotationBeanPostProcessor类除了实现BeanPostProcessor以外,还实现了SmartInitializingSingleton接口,所以当RabbitListenerAnnotationBeanPostProcessor这个bean实例化完成之后会调用它的 afterSingletonsInstantiated() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

@Override
public void afterSingletonsInstantiated() {

...

// Actually register all listeners
// 因为之前已经将所有的endpoint添加到了RabbitListenerEndpointRegistrar类中的一个List中了,所以这里调用RabbitListenerEndpointRegistrar类的afterPropertiesSet()方法进行统一注册
this.registrar.afterPropertiesSet();

...
}

@Override
public void afterPropertiesSet() {
// 不管是单独注册endpoint还是统一注册,调用的是同样的方法registerAllEndpoints()
registerAllEndpoints();
}

protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
// for循环,一个一个注册
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}

跟踪 registerListenerContainer() 方法查看具体是怎么注册的,直到进入下面这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");

String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
startIfNecessary(container);
}
}
}

可见,注册endpoint,实际上就是RabbitListenerContainerFactory将每一个endpoint都创建成MessageListenerContainer(具体创建过程,由RabbitListenerContainerFactory类自己去完成),然后根据startImmediately参数判断是否调用startIfNecessary()方法立即启动MessageListenerContainer。

实际接收消息是由这个MessageListenerContainer来做的,而MessageListenerContainer接口中有一个接口方法来设置MessageListener:

1
2
3
4
5
6
7
8
9

/**
* Setup the message listener to use. Throws an {@link IllegalArgumentException}
* if that message listener type is not supported.
* @param messageListener the {@code object} to wrapped to the {@code MessageListener}.
*/
void setupMessageListener(Object messageListener);


MessageListener将会调用我们加了@RabbitListener注解的方法处理消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

/**
* Listener interface to receive asynchronous delivery of Amqp Messages.
*
* @author Mark Pollack
* @author Gary Russell
*/
@FunctionalInterface
public interface MessageListener {

void onMessage(Message message);

}


或者是ChannelAwareMessageListener接口类来调用我们的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

/**
* A message listener that is aware of the Channel on which the message was received.
*
* @author Mark Pollack
* @author Gary Russell
*/
@FunctionalInterface
public interface ChannelAwareMessageListener {

/**
* Callback for processing a received Rabbit message.
* <p>Implementors are supposed to process the given Message,
* typically sending reply messages through the given Session.
* @param message the received AMQP message (never <code>null</code>)
* @param channel the underlying Rabbit Channel (never <code>null</code>)
* @throws Exception Any.
*/
void onMessage(Message message, Channel channel) throws Exception;
}

这样接收并处理消息的所有工作就完成了。

如果不立即启动MessageListenerContainer,RabbitListenerEndpointRegistry也实现了SmartLifecycle接口,所以在spring context refresh的最后一步会去调用start()方法:

1
2
3
4
5
6
7
   @Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}

可以看到在这里统一启动了所有的MessageListenerContainer:

1
2
3
4
5
6
   private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}

所谓启动MessageListenerContainer其实就是调用MessageListenerContainer的start()方法。这也是SmartLifecycle的一个接口方法,它的实现必须保证调用了这个start()方法之后MessageListenerContainer将能够接受到消息。

所以对@RabbitListener注解的整个处理流程就是这样。

总结一下整个实现流程:

  • @RabbitListener注解的方法所在的类首先是一个bean,因此,实现BeanPostProcessor接口对每一个初始化完成的bean进行处理。
  • 遍历bean中由用户自己定义的所有的方法,找出其中添加了@RabbitListener注解的方法(也可以是@RabbitHandler注解,上面已经讲了,不再赘述)。
  • 读取上面找出的所有方法上@RabbitListener注解中的值,并为每一个方法创建一个RabbitListenerEndpoint,保存在RabbitListenerEndpointRegistrar类中。
  • 在所有的bean都初始化完成,即所有@RabbitListener注解的方法都创建了endpoint之后,由我们配置的RabbitListenerContainerFactory将每个endpoint创建MessageListenerContainer。
  • 最后启动上面创建的MessageListenerContainer。
  • 至此,全部完成,MessageListenerContainer启动后将能够接受到消息,再将消息交给它的MessageListener处理消息。

下面还剩下几件事情才能真正实现上面的步骤:

  • RabbitListenerContainerFactory只是个接口,它不会自己创建
  • MessageListenerContainer,所以需要一个RabbitListenerContainerFactory实现类,它必须能创建MessageListenerContainer。
    MessageListenerContainer也只是一个接口,它不会自己接收消息,所以需要一个MessageListenerContainer实现类,它必须做到在启动后能够接收消息,同时它必须能设置MessageListener,用以处理消息。
  • MessageListener(或ChannelAwareMessageListener)也只是一个接口,所以还需要一个MessageListener实现类,它必须能调用我们加了@RabbitListener注解的方法。

4.5 SimpleMessageListenerContainer

(1)概述

SimpleMessageListenerContainerspringrabbitmq原生api基础上封装实现的一个消费工具类,该类非常强大,可以实现:

  • 监听单个或多个队列
  • 自动启动
  • 自动声明
  • 还支持动态配置,如动态添加监听队列、动态调整并发数等等。

基本上对RabbitMQ消费场景这个类都能满足。如@RabbitListener、cloud-stream中StreamListener中底层实现都是基于该类,所以理解SimpleMessageListenerContainer原理对理解spring rabbitmq中消费模型非常关键。

(2)基本使用

运行时添加/移除监听队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 字符串集合动态添加到queueNames中
public void addQueueNames(String... queueName) {
super.addQueueNames(queueName);
this.queuesChanged();
}

public boolean removeQueueNames(String... queueName) {
if (super.removeQueueNames(queueName)) {
this.queuesChanged();
return true;
} else {
return false;
}
}

private final Object consumersMonitor = new Object();
private Set<BlockingQueueConsumer> consumers;

private void queuesChanged() {
// 获取对象锁
synchronized(this.consumersMonitor) {
if (this.consumers != null) {
int count = 0;
// 遍历消费者集合,依次取消订阅并移出集合
for(Iterator consumerIterator = this.consumers.iterator(); consumerIterator.hasNext(); ++count) {
BlockingQueueConsumer consumer = (BlockingQueueConsumer)consumerIterator.next();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Queues changed; stopping consumer: " + consumer);
}

// 取消订阅
consumer.basicCancel(true);
consumerIterator.remove();
}

// 再重新创建消费者
this.addAndStartConsumers(count);
}

}
}

protected void addAndStartConsumers(int delta) {
// 获取对象锁
synchronized(this.consumersMonitor) {
if (this.consumers != null) {
// 创建指定数目的消费者
for(int i = 0; i < delta && (this.maxConcurrentConsumers == null || this.consumers.size() < this.maxConcurrentConsumers); ++i) {
// 使用该类默认值/创建模板时指定的参数来构建消费者
BlockingQueueConsumer consumer = this.createBlockingQueueConsumer();
// 加回集合
this.consumers.add(consumer);
// 异步启动消费者
SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Starting a new consumer: " + consumer);
}

this.getTaskExecutor().execute(processor);
if (this.getApplicationEventPublisher() != null) {
// 发布事件
this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}

try {
// 若启动流程有异常(getStartupException阻塞等待结果),则从集合移出消费者,并终止消费者
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
// 多余的一行?
this.consumers.remove(consumer);
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
} catch (InterruptedException var8) {
Thread.currentThread().interrupt();
} catch (Exception var9) {
consumer.stop();
this.logger.error("Error starting new consumer", var9);
this.cancellationLock.release(consumer);
this.consumers.remove(consumer);
}
}
}

}
}

后置处理器 setAfterReceivePostProcessors()

1
2
3
4
5
6
7
8
9
10
11
// 后置处理器,接收到的消息都添加了Header请求头
container.setAfterReceivePostProcessors(message -> {
message.getMessageProperties().getHeaders().put("desc", 10);
return message;
});

container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});

设置消费者的tag和Arguments:

1
2
3
4
5
6
7
// 设置消费者的Consumer tag
container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
// 设置消费者的Arguments
Map args = new HashMap<>();
args.put("module","订单模块");
args.put("fun","发送消息");
container.setConsumerArguments(args);

【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析1

设置并发消费者:

  • setConcurrentConsumers设置多个并发消费者一起消费,并支持运行时动态修改。
  • setMaxConcurrentConsumers设置最多的并发消费者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean 
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("xxx");
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(10);
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
return container;
}

(3)核心原理

API结构:

SimpleMessageListenerContainer类结构如下:

【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析2

方法入口:

SimpleMessageListenerContainer 类启动的入口是 start() 方法,该方法位于父类 AbstractMessageListenerContainer 中:

AbstractMessageListenerContainer#start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void start() {
// 已启动,则直接退出
if (!this.isRunning()) {
// initialized未执行初始化,则执行afterPropertiesSet()方法进行初始化,执行完成后initialized设置成true
if (!this.initialized) {
synchronized(this.lifecycleMonitor) {
if (!this.initialized) {
this.afterPropertiesSet();
}
}
}

try {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Starting Rabbit listener container.");
}

// 验证RabbitAdmin,mismatchedQueuesFatal=true时,spring context中RabbitAdmin数量不能大于1
this.configureAdminIfNeeded();
// 执行RabbitAdmin#initialize方法,spring context中注入的exchanges, queues and bindings执行声明式创建
this.checkMismatchedQueues();
// 启动核心
this.doStart();
} catch (Exception var3) {
throw this.convertRabbitAccessException(var3);
}
}
}
SimpleMessageListenerContainer#doStart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// 父类
protected void doStart() throws Exception {
synchronized(this.lifecycleMonitor) {
this.active = true;
this.running = true;
this.lifecycleMonitor.notifyAll();
}
}

// 子类
protected void doStart() throws Exception {
// 如果MessageListener是ListenerContainerAware,则进行expectedQueueNames校验
if (this.getMessageListener() instanceof ListenerContainerAware) {
Collection<String> expectedQueueNames = ((ListenerContainerAware)this.getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = this.getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: " + Arrays.asList(queueNames));
boolean found = false;
String[] var4 = queueNames;
int var5 = queueNames.length;

for(int var6 = 0; var6 < var5; ++var6) {
String queueName = var4[var6];
if (!expectedQueueNames.contains(queueName)) {
found = false;
break;
}

found = true;
}

Assert.state(found, () -> {
return "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: " + Arrays.asList(queueNames);
});
}
}

// 调用父类doStart()方法,主要是active和running都设置成true
super.doStart();
synchronized(this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
} else {
// 创建BlockingQueueConsumer类型consumer,每个concurrentConsumers并发对应创建一个对象,并存储到Set consumers集合中,返回值就是创建consumer对象个数
// 具体创建逻辑见:SimpleMessageListenerContainer#createBlockingQueueConsumer
// 主要注意下prefetchCount计算: int actualPrefetchCount = getPrefetchCount() > this.batchSize ? getPrefetchCount() : this.batchSize;
// 即如果prefetchCount大于batchSize,则其就是实际值,否则prefetchCount等于batchSize值
int newConsumers = this.initializeConsumers();
if (this.consumers == null) {
this.logger.info("Consumers were initialized and then cleared (presumably the container was stopped concurrently)");
} else if (newConsumers <= 0) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Consumers are already running");
}

} else {
Set<SimpleMessageListenerContainer.AsyncMessageProcessingConsumer> processors = new HashSet();
Iterator var12 = this.consumers.iterator();

while(var12.hasNext()) {
// 将BlockingQueueConsumer对象封装成AsyncMessageProcessingConsumer进行异步执行
BlockingQueueConsumer consumer = (BlockingQueueConsumer)var12.next();
SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
// 存储到processors集合中
processors.add(processor);
// 将AsyncMessageProcessingConsumer丢到线程池中执行
this.getTaskExecutor().execute(processor);
// 事件发送
if (this.getApplicationEventPublisher() != null) {
this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}

var12 = processors.iterator();

// 判断启动过程中是否存在异常
FatalListenerStartupException startupException;
do {
if (!var12.hasNext()) {
return;
}

SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = (SimpleMessageListenerContainer.AsyncMessageProcessingConsumer)var12.next();
startupException = processor.getStartupException();
} while(startupException == null);

throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
}

上面代码大致逻辑:BlockingQueueConsumer 对象可以看成 consumer ,然后将其包装成 AsyncMessageProcessingConsumer 异步任务丢入到线程池中运行。

异步任务:主要接口为AsyncMessageProcessingConsumer#run。

AsyncMessageProcessingConsumer#run
  1. 若当前consumer没有设置任何监听队列,则没必要启动
  2. 初始化:
    • 通过AmqpAdmin重新声明创建交换器、队列、绑定。
  3. 死循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
     public void run() {
// 必须处于活跃状态
if (SimpleMessageListenerContainer.this.isActive()) {
boolean aborted = false;
int consecutiveIdles = 0;
int consecutiveMessages = 0;
this.consumer.setLocallyTransacted(SimpleMessageListenerContainer.this.isChannelLocallyTransacted());
String routingLookupKey = SimpleMessageListenerContainer.this.getRoutingLookupKey();
if (routingLookupKey != null) {
SimpleResourceHolder.bind(SimpleMessageListenerContainer.this.getRoutingConnectionFactory(), routingLookupKey);
}

// 表示当前consumer没有设置任何监听队列,则没必要启动
if (this.consumer.getQueueCount() < 1) {
if (SimpleMessageListenerContainer.this.logger.isDebugEnabled()) {
SimpleMessageListenerContainer.this.logger.debug("Consumer stopping; no queues for " + this.consumer);
}
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (SimpleMessageListenerContainer.this.getApplicationEventPublisher() != null) {
SimpleMessageListenerContainer.this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}

this.start.countDown();
} else {
try {
// 1.初始化
try {
// 通过AmqpAdmin重新声明创建交换器、队列、绑定
SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
this.consumer.start();
this.start.countDown();
} catch (QueuesNotAvailableException var34) {
if (SimpleMessageListenerContainer.this.isMissingQueuesFatal()) {
throw var34;
}

this.start.countDown();
SimpleMessageListenerContainer.this.handleStartupFailure(this.consumer.getBackOffExecution());
throw var34;
} catch (FatalListenerStartupException var35) {
if (SimpleMessageListenerContainer.this.isPossibleAuthenticationFailureFatal()) {
throw var35;
}

Throwable possibleAuthException = var35.getCause().getCause();
if (possibleAuthException != null && possibleAuthException instanceof PossibleAuthenticationFailureException) {
this.start.countDown();
SimpleMessageListenerContainer.this.handleStartupFailure(this.consumer.getBackOffExecution());
throw possibleAuthException;
}

throw var35;
} catch (Throwable var36) {
this.start.countDown();
SimpleMessageListenerContainer.this.handleStartupFailure(this.consumer.getBackOffExecution());
throw var36;
}

if (SimpleMessageListenerContainer.this.getTransactionManager() != null) {
ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), SimpleMessageListenerContainer.this.getConnectionFactory());
}

// 死循环
while(SimpleMessageListenerContainer.this.isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
boolean receivedOk = SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer);
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (receivedOk) {
if (SimpleMessageListenerContainer.this.isActive(this.consumer)) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
SimpleMessageListenerContainer.this.considerAddingAConsumer();
consecutiveMessages = 0;
}
}
} else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
SimpleMessageListenerContainer.this.considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}

long idleEventInterval = SimpleMessageListenerContainer.this.getIdleEventInterval();
if (idleEventInterval > 0L) {
if (receivedOk) {
SimpleMessageListenerContainer.this.updateLastReceive();
} else {
long now = System.currentTimeMillis();
long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
long lastReceive = SimpleMessageListenerContainer.this.getLastReceive();
if (now > lastReceive + idleEventInterval && now > lastAlertAt + idleEventInterval && SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {
SimpleMessageListenerContainer.this.publishIdleContainerEvent(now - lastReceive);
}
}
}
} catch (ListenerExecutionFailedException var37) {
if (var37.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", var37);
}
} catch (AmqpRejectAndDontRequeueException var38) {
}
}
} catch (InterruptedException var39) {
SimpleMessageListenerContainer.this.logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
SimpleMessageListenerContainer.this.publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, var39);
} catch (QueuesNotAvailableException var40) {
SimpleMessageListenerContainer.this.logger.error("Consumer received fatal=" + SimpleMessageListenerContainer.this.isMismatchedQueuesFatal() + " exception on startup", var40);
if (SimpleMessageListenerContainer.this.isMissingQueuesFatal()) {
this.startupException = var40;
aborted = true;
}

SimpleMessageListenerContainer.this.publishConsumerFailedEvent("Consumer queue(s) not available", aborted, var40);
} catch (FatalListenerStartupException var41) {
SimpleMessageListenerContainer.this.logger.error("Consumer received fatal exception on startup", var41);
this.startupException = var41;
aborted = true;
SimpleMessageListenerContainer.this.publishConsumerFailedEvent("Consumer received fatal exception on startup", true, var41);
} catch (FatalListenerExecutionException var42) {
SimpleMessageListenerContainer.this.logger.error("Consumer received fatal exception during processing", var42);
aborted = true;
SimpleMessageListenerContainer.this.publishConsumerFailedEvent("Consumer received fatal exception during processing", true, var42);
} catch (PossibleAuthenticationFailureException var43) {
SimpleMessageListenerContainer.this.logger.error("Consumer received fatal=" + SimpleMessageListenerContainer.this.isPossibleAuthenticationFailureFatal() + " exception during processing", var43);
if (SimpleMessageListenerContainer.this.isPossibleAuthenticationFailureFatal()) {
this.startupException = new FatalListenerStartupException("Authentication failure", new AmqpAuthenticationException(var43));
aborted = true;
}

SimpleMessageListenerContainer.this.publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, var43);
} catch (ShutdownSignalException var44) {
if (RabbitUtils.isNormalShutdown(var44)) {
if (SimpleMessageListenerContainer.this.logger.isDebugEnabled()) {
SimpleMessageListenerContainer.this.logger.debug("Consumer received Shutdown Signal, processing stopped: " + var44.getMessage());
}
} else {
this.logConsumerException(var44);
}
} catch (AmqpIOException var45) {
if (var45.getCause() instanceof IOException && var45.getCause().getCause() instanceof ShutdownSignalException && var45.getCause().getCause().getMessage().contains("in exclusive use")) {
SimpleMessageListenerContainer.this.getExclusiveConsumerExceptionLogger().log(SimpleMessageListenerContainer.this.logger, "Exclusive consumer failure", var45.getCause().getCause());
SimpleMessageListenerContainer.this.publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, var45);
} else {
this.logConsumerException(var45);
}
} catch (Error var46) {
SimpleMessageListenerContainer.this.logger.error("Consumer thread error, thread abort.", var46);
SimpleMessageListenerContainer.this.publishConsumerFailedEvent("Consumer threw an Error", true, var46);
aborted = true;
} catch (Throwable var47) {
if (SimpleMessageListenerContainer.this.isActive()) {
this.logConsumerException(var47);
}
} finally {
if (SimpleMessageListenerContainer.this.getTransactionManager() != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}

}

this.start.countDown();
if (SimpleMessageListenerContainer.this.isActive(this.consumer) && !aborted) {
SimpleMessageListenerContainer.this.logger.info("Restarting " + this.consumer);
SimpleMessageListenerContainer.this.restart(this.consumer);
} else {
SimpleMessageListenerContainer.this.logger.debug("Cancelling " + this.consumer);

try {
this.consumer.stop();
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (SimpleMessageListenerContainer.this.getApplicationEventPublisher() != null) {
SimpleMessageListenerContainer.this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
} catch (AmqpException var33) {
SimpleMessageListenerContainer.this.logger.info("Could not cancel message consumer", var33);
}

if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort.compareAndSet((Object)null, Thread.currentThread())) {
SimpleMessageListenerContainer.this.logger.error("Stopping container from aborted consumer");
SimpleMessageListenerContainer.this.stop();
SimpleMessageListenerContainer.this.containerStoppingForAbort.set((Object)null);
ListenerContainerConsumerFailedEvent event = null;

do {
try {
event = (ListenerContainerConsumerFailedEvent)SimpleMessageListenerContainer.this.abortEvents.poll(5L, TimeUnit.SECONDS);
if (event != null) {
SimpleMessageListenerContainer.this.publishConsumerFailedEvent(event.getReason(), event.isFatal(), event.getThrowable());
}
} catch (InterruptedException var32) {
Thread.currentThread().interrupt();
}
} while(event != null);
}
}

if (routingLookupKey != null) {
SimpleResourceHolder.unbind(SimpleMessageListenerContainer.this.getRoutingConnectionFactory());
}

}
}
}
BlockingQueueConsumer#start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// 主要完成与`Rabbit Broker`指令交互
public void start() throws AmqpException {
if (logger.isDebugEnabled()) {
logger.debug("Starting consumer " + this);
}

this.thread = Thread.currentThread();

try {
this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory, this.transactional);
this.channel = this.resourceHolder.getChannel();
ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel);
} catch (AmqpAuthenticationException var8) {
throw new FatalListenerStartupException("Authentication failure", var8);
}

this.deliveryTags.clear();
this.activeObjectCounter.add(this);
int passiveDeclareRetries = this.declarationRetries;
this.declaring = true;

while(!this.cancelled()) {
try {
// 校验监听队列是否存在
// `channel.queueDeclarePassive(queueName)`,最终会向`Rabbit Broker`发送`queue.declare`指令,并设置`passive=true`
this.attemptPassiveDeclarations();
if (passiveDeclareRetries < this.declarationRetries && logger.isInfoEnabled()) {
logger.info("Queue declaration succeeded after retrying");
}

passiveDeclareRetries = 0;
} catch (BlockingQueueConsumer.DeclarationException var10) {
if (passiveDeclareRetries > 0 && this.channel.isOpen()) {
if (logger.isWarnEnabled()) {
logger.warn("Queue declaration failed; retries left=" + passiveDeclareRetries, var10);

try {
Thread.sleep(this.failedDeclarationRetryInterval);
} catch (InterruptedException var7) {
this.declaring = false;
Thread.currentThread().interrupt();
this.activeObjectCounter.release(this);
throw RabbitExceptionTranslator.convertRabbitAccessException(var7);
}
}
} else {
if (var10.getFailedQueues().size() >= this.queues.length) {
this.declaring = false;
this.activeObjectCounter.release(this);
throw new QueuesNotAvailableException("Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.", var10);
}

if (logger.isWarnEnabled()) {
logger.warn("Not all queues are available; only listening on those that are - configured: " + Arrays.asList(this.queues) + "; not available: " + var10.getFailedQueues());
}

this.missingQueues.addAll(var10.getFailedQueues());
this.lastRetryDeclaration = System.currentTimeMillis();
}
}

if (passiveDeclareRetries-- <= 0 || this.cancelled()) {
break;
}
}

this.declaring = false;
if (!this.acknowledgeMode.isAutoAck() && !this.cancelled()) {
try {
// 最终会向`Rabbit Broker`发送`basic.qos`指令,并将`prefetch-size`、`prefetch-count`和`global`参数设置过去
this.channel.basicQos(this.prefetchCount);
} catch (IOException var6) {
this.activeObjectCounter.release(this);
throw new AmqpIOException(var6);
}
}

try {
if (!this.cancelled()) {
String[] var2 = this.queues;
int var3 = var2.length;

for(int var4 = 0; var4 < var3; ++var4) {
String queueName = var2[var4];
if (!this.missingQueues.contains(queueName)) {
// 会使用`channel.basicConsume`方法订阅消息,最终会向`Rabbit Broker`发送`basic.consume`指令,并指定订阅消息的`queue`名称等参数消息
// 注意:`SimpleMessageListenerContainer`可能设置多个监听队列,则`BlockingQueueConsumer`这里会给每个监听队列都向Broker发送一个`basic.consume`订阅指令,并且是使用同一个`channel`:
this.consumeFromQueue(queueName);
}
}
}

} catch (IOException var9) {
throw RabbitExceptionTranslator.convertRabbitAccessException(var9);
}
}

响应处理:

上面分析

  • initialize()初始化操作,客户端向Broker发送basic.qosbasic.consume指令就相当于告诉了服务器:我都准备好了,如果监听队列有消息你就把它推送给我,下面就来分析下Broker消息推送流程。

  • 死循环:Rabbit Broker接收到Basic.consume指令后,会向客户端反馈Basic.consume-ok指令,表示服务端一切就绪准备给客户端推送消息,然后就通过Basic.Deliver指令类型将消息推送给客户端,一条消息对应一个Deliver反馈,客户端接收到服务端返回过来的指令类型后,在ChannelN#processAsync方法进行判断处理,它是amqp-client依赖包中类:

    【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析7

如果是Deliver类型指令,则调用 processDelivery() 方法进行处理:

1
2
protected void processDelivery(Command command, Basic.Deliver method) { Basic.Deliver m = method; //根据Deliver的consumerTag获取到InternalConsumer对象,因为一个Channel上可能存在多个consumer,需要找到Broker是针对哪个consumer进行的响应 Consumer callback = _consumers.get(m.getConsumerTag()); if (callback == null) { if (defaultConsumer == null) { // No handler set. We should blow up as this message // needs acking, just dropping it is not enough. See bug // 22587 for discussion. throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case."); } else { callback = defaultConsumer; } } Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(), m.getExchange(), m.getRoutingKey()); try { // call metricsCollector before the dispatching (which is async anyway) // this way, the message is inside the stats before it is handled // in case a manual ack in the callback, the stats will be able to record the ack metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag()); this.dispatcher.handleDelivery(callback, m.getConsumerTag(), envelope, (BasicProperties) command.getContentHeader(), command.getContentBody()); } catch (WorkPoolFullException e) { // couldn't enqueue in work pool, propagating throw e; } catch (Throwable ex) { getConnection().getExceptionHandler().handleConsumerException(this, ex, callback, m.getConsumerTag(), "handleDelivery"); }
}

processDelivery()处理Broker返回的Deliver消息大致流程:

  • Consumer callback = _consumers.get(m.getConsumerTag()):根据DeliverconsumerTag获取到InternalConsumer对象,因为一个Channel上可能存在多个consumer,需要找到Broker是针对哪个consumer进行的响应

  • 封装DeliverEnvelope

    1
    Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(), m.getExchange(), m.getRoutingKey());
  • metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag()):统计数据处理

  • 调用ConsumerDispatcher#handleDelivery,其会创建任务丢到线程池中执行,任务:将数据交由具体的consumer处理,即调用InternalConsumer#handleDelivery

    1
    this.dispatcher.handleDelivery(callback, m.getConsumerTag(), envelope, (BasicProperties) command.getContentHeader(), command.getContentBody());
  • InternalConsumer#handleDelivery()方法:将Broker返回的Deliver数据放入到BlockingQueueConsumer.queue中:

1
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));

所以,如果ListenerContainer监听多个队列,则BlockingQueueConsumer中则对应多个InternalConsumer,每个InternalConsumer映射Broker上的一个,BlockingQueueConsumer下所有InternalConsumer共享同一个queue

业务处理:

上面分析了消息订阅以及Broker推送过来的消息数据会被缓存到BlockingQueueConsumer对象的queue队列中,下面就来分析下从queue中提取消息到传递给用户业务逻辑这个流程。这就需要分析AsyncMessageProcessingConsumer#run方法中另一个非常重要操作:无限循环mainLoop操作,它主要就是完成从queue中提取消息数据然后经过一系列操作最终传递给用户逻辑MessageListener中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void mainLoop() throws Exception { // NOSONAR Exception
try {
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) { checkAdjust(receivedOk);
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) { if (receivedOk) { updateLastReceive(); } else { long now = System.currentTimeMillis(); long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get(); long lastReceive = getLastReceive(); if (now > lastReceive + idleEventInterval && now > lastAlertAt + idleEventInterval && SimpleMessageListenerContainer.this.lastNoMessageAlert .compareAndSet(lastAlertAt, now)) { publishIdleContainerEvent(now - lastReceive); } }
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) { throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
/* * These will normally be wrapped by an LEFE if thrown by the * listener, but we will also honor it if thrown by an
* error handler.
*/
}
}

跟踪下doReceiveAndExecute():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR

Channel channel = consumer.getChannel();

List messages = null;
long deliveryTag = 0; //batchSize默认是1,用于指定一次从queue中提取消息数量
for (int i = 0; i < this.batchsize; i++) {> afterReceivePostProcessors = getAfterReceivePostProcessors(); if (afterReceivePostProcessors != null) { Message original = message; deliveryTag = message.getMessageProperties().getDeliveryTag(); for (MessagePostProcessor processor : getAfterReceivePostProcessors()) { message = processor.postProcessMessage(message); if (message == null) { channel.basicAck(deliveryTag, false); if (this.logger.isDebugEnabled()) { this.logger.debug( "Message Post Processor returned 'null', discarding message " + original); } break; } } } if (message != null) { if (messages == null) { messages = new ArrayList<>(this.batchSize); } if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties())) { final List messageList = messages; getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment)); } else { messages.add(message); } }
}
else { messages = debatch(message); if (messages != null) { break; } try { //执行MessageListener executeListener(channel, message); } catch (ImmediateAcknowledgeAmqpException e) { if (this.logger.isDebugEnabled()) { this.logger.debug("User requested ack for failed delivery '" + e.getMessage() + "': " + message.getMessageProperties().getDeliveryTag()); } break; } catch (Exception ex) { if (causeChainHasImmediateAcknowledgeAmqpException(ex)) { if (this.logger.isDebugEnabled()) { this.logger.debug("User requested ack for failed delivery: " + message.getMessageProperties().getDeliveryTag()); } break; } if (getTransactionManager() != null) { if (getTransactionAttribute().rollbackOn(ex)) { RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager .getResource(getConnectionFactory()); if (resourceHolder != null) { consumer.clearDeliveryTags(); } else { /* * If we don't actually have a transaction, we have to roll back * manually. See prepareHolderForRollback(). */ consumer.rollbackOnExceptionIfNecessary(ex); } throw ex; // encompassing transaction will handle the rollback. } else { if (this.logger.isDebugEnabled()) { this.logger.debug("No rollback for " + ex); } break; } } else { consumer.rollbackOnExceptionIfNecessary(ex); throw ex; } } }
}
if (messages != null) { executeWithList(channel, messages, deliveryTag, consumer);
} return consumer.commitIfNecessary(isChannelLocallyTransacted());

}

总结:

上面对SimpleMessageListenerContainer核心源码进行分析,比较枯燥不太直观,总结下其最核心就是位于AsyncMessageProcessingConsumer#run方法中两个操作:initialize()和无限循环mainLoop()

initialize()方法主要完成:通过指令方式将需要监听队列信息告诉Rabbit BrokerBroker在监听队列中有消息数据时通过Deliver指令将消息推送给客户端,客户端接收的Deliver指令后,根据consumerTag分发(dispatcher)给具体consumer,然后consumer将其放入到其所属BlockingQueueConsumer对象的队列queue中,其逻辑可见下图:

【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析8

BlockingQueueConsumerAsyncMessageProcessingConsumer、监听队列等关系:

1、BlockingQueueConsumer相当于一个逻辑消费者,通过封装成AsyncMessageProcessingConsumer异步任务,然后丢到线程池中运行,线程池可以通过SimpleMessageListenerContainer#setTaskExecutor进行自定义配置,所以,BlockingQueueConsumer可以看成单独线程运行,且对应一个Channel

2、SimpleMessageListenerContainer可以监听多个队列消息,每个队列又会创建一个InternalConsumer对象,用于映射Broker上的consumer概念,它们是共用同一个channel,即channel下存在多个consumer,它们之间通过consumerTag区分,另外,Broker推送消息也是根据consumerTag识别具体推送给哪个consumer进行处理;

案例,比如:

1
2
3
container.setQueueNames("test01", "test02");
container.setConcurrentConsumers(3);
container.setConsumerTagStrategy(queue -> "consumer idx:"+consumerIdx.getAndIncrement());

a、根据并发数concurrentConsumers创建对应数量的BlockingQueueConsumer,然后封装成AsyncMessageProcessingConsumer,再分配一个线程进行执行,这里设置成3,所以会有3个线程运行AsyncMessageProcessingConsumer,每个AsyncMessageProcessingConsumer对应一个channel,所以会创建3个channel,在Web UI上可以看到对应channel:

【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析9

b、每个监听队列创建一个InternalConsumer和Broker的consumer进行映射,这里有两个监听队列,所以每个channel下会存在2个consumer:

【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析10

AsyncMessageProcessingConsumer如何订阅:

a、首先发送Basic.Qos指令约定消息推送速率问题;

b、然后发送Basic.Consume指令告诉Broker客户端要开始订阅什么队列上的消息,以及把consumerTag带上,因为可能存在多个监听队列,则同一个channel上可能会发送多次Basic.Consume指令,Brokerchannel推送消息时需要根据consumerTag找到对应consumer处理;

c、Broker通过Deliver指令类型方式向客户端推送消息,客户端接收到消息后,根据consumerTag找到对应consumer交由其进行处理,即分发dispatcher;

d、这里的consumer对应的是InternalConsumer,它处理逻辑就是放入到它所在的BlockingQueueConsumer对象中消息队列queue中;

mainLoop


Broker推送过来的消息放入到了BlockingQueueConsumer对象的消息队列queue中,后续就是从queue中提取消息进行业务处理,逻辑见下图:

【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析11

a、AsyncMessageProcessingConsumer被丢入到线程池中执行,则其对应一个线程;

b、这个线程会一直循环执行mainLoop()方法;

c、mainLoop()方法中就会从queue中提取消息,根据batchSize确定每次提取消息数量,最后回调MessageListener,实现将消息传递到业务逻辑进行处理;

d、注意:所有的AsyncMessageProcessingConsumer共用同一个MessageListener对象,对象状态要注意线程安全问题;

总体流程


【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析12

【RabbitMQ分析】01 SimpleMessageListenerContainer原理分析 - 华为云 (huaweicloud.com)

MessageListenerContainer的创建

首先看AbstractRabbitListenerContainerFactory抽象类的下面这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
C instance = createContainerInstance();
...

endpoint.setupListenerContainer(instance);
...

initializeContainer(instance, endpoint);

return instance;
}


注意里面两个方法,后面这个方法里面SimpleRabbitListenerContainerFactory会做一些它独有的属性设置,前一个方法执行结束,MessageListener就设置到MessageListenerContainer里面去了,可以跟踪这个方法,一直到AbstractRabbitListenerEndpoint类的下面这个方法:

1
2
3
4
5
6
7
8

private void setupMessageListener(MessageListenerContainer container) {
MessageListener messageListener = createMessageListener(container);
Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener");
container.setupMessageListener(messageListener);
}


可以看到在这个方法里创建了MessageListener,并将其设置到MessageListenerContainer里面去。

createMessageListener()方法有两个实现,实际调用的是MethodRabbitListenerEndpoint类里面的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13

@Override
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
...

MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
...

return messageListener;
}


看到setHandlerMethod(configureListenerAdapter(messageListener))这一行,这里创建并设置了一个HandlerAdapter,这个HandlerAdapter能够调用我们加了@RabbitListener注解的方法。

SimpleMessageListenerContainer接收消息的实现

SimpleRabbitListenerContainerFactory创建的MessageListenerContainer是SimpleMessageListenerContainer类,下面看它是怎么在启动后就能接收消息的。

上面讲过RabbitListenerEndpointRegistry类通过调用MessageListenerContainer的start()方法类启动这个MessageListenerContainer。

SimpleMessageListenerContainer类本身并没有实现start()方法,在它继承的抽象父类里面。进入AbstractMessageListenerContainer抽象类找到start()方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

@Override
public void start() {
if (isRunning()) {
return;
}
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
this.initialized = true;
}
}
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Starting Rabbit listener container.");
}
configureAdminIfNeeded();
checkMismatchedQueues();
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
}


真正的启动方法是doStart(),所以去SimpleMessageListenerContainer类中找这个类的doStart()实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

@Override
protected void doStart() throws Exception {
...

int newConsumers = initializeConsumers();

...

for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
...
}


这个方法很长,细节就不去深究了,这里注意两个方法,一个是initializeConsumers():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.cancellationLock.reset();
this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
count++;
}
}
}
return count;
}


这个方法创建了BlockingQueueConsumer,数量等于concurrentConsumers参数的配置。

另一个方法是getTaskExecutor().execute(processor),前面用BlockingQueueConsumer创建了AsyncMessageProcessingConsumer(实现了Runnable接口),这里获取到Executor来执行,每一个MessageListenerContainer都有各自的Executor。

在AsyncMessageProcessingConsumer类的run()方法里面可以找到下面这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (receivedOk) {
if (isActive(this.consumer)) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
}

...
}

...
}

...
}

这里有两个地方需要注意。

一个是this.consumer.hasDelivery()

1
2
3
4
5
	protected boolean hasDelivery() {
return !this.queue.isEmpty();
}

private final BlockingQueue<Delivery> queue;

另一个要注意的是receiveAndExecute()方法,跟踪进去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR

Channel channel = consumer.getChannel();

for (int i = 0; i < this.txSize; i++) {

logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
try {
executeListener(channel, message);
}
...
}
}

consumer.nextMessage(this.receiveTimeout);会从上面那个BlockingQueue里面拿一条消息出来。

所以SimpleMessageListenerContainer接收消息的实现方案是:用一个BlockingQueue保存rabbitmq发过来还未来得及处理的消息,然后向Executor提交执行Runnable,Runnable中循环从BlockingQueue里面取消息。

至于这个BlockingQueue里面的消息是怎么从rabbitmq获取到的,此处暂不讨论。

MessageListener调用@RabbitListener注解方法处理消息的实现

上面的receiveAndExecute()方法接收消息的同时也将其处理了,继续跟踪,直到进入下面这个方法:

1
2
3
4
5
6
7
8
9
10

protected void executeListener(Channel channel, Message messageIn) throws Exception {

...

invokeListener(channel, message);

...
}

在这个方法里面可以看到invokeListener()方法:

1
2
3
4

protected void invokeListener(Channel channel, Message message) throws Exception {
this.proxy.invokeListener(channel, message);
}

这里有个proxy,这个proxy是由下面这个方法创建的匿名类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
   protected void actualInvokeListener(Channel channel, Message message) throws Exception {
Object listener = getMessageListener();
if (listener instanceof ChannelAwareMessageListener) {
doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
}
else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
if (bindChannel) {
RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, message);
}
finally {
if (bindChannel) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
}
}
}
else if (listener != null) {
throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: "
+ listener);
}
else {
throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
}
}


这个方法里可以看到doInvokeListener()方法,已经差不多接近我们的@RabbitListener注解的方法了,继续跟踪:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
throws Exception {

...

try {
listener.onMessage(message, channelToUse);
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
}

...
}

跟踪listener.onMessage()方法,直到进入MessagingMessageListenerAdapter类的onMessage()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

@Override
public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception {
Message<?> message = toMessagingMessage(amqpMessage);
if (logger.isDebugEnabled()) {
logger.debug("Processing [" + message + "]");
}
try {
Object result = invokeHandler(amqpMessage, channel, message);
if (result != null) {
handleResult(result, amqpMessage, channel, message);
}
else {
logger.trace("No result object given - no result to handle");
}
}
catch (ListenerExecutionFailedException e) {
if (this.errorHandler != null) {
try {
Object result = this.errorHandler.handleError(amqpMessage, message, e);
if (result != null) {
handleResult(result, amqpMessage, channel, message);
}
else {
logger.trace("Error handler returned no result");
}
}
catch (Exception ex) {
returnOrThrow(amqpMessage, channel, message, ex, ex);
}
}
else {
returnOrThrow(amqpMessage, channel, message, e.getCause(), e);
}
}
}

这里通过invokHandler()方法消费获取到的message,然后在catch里面处理异常,进入invokeHandler()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

private Object invokeHandler(org.springframework.amqp.core.Message amqpMessage, Channel channel,
Message<?> message) {
try {
return this.handlerMethod.invoke(message, amqpMessage, channel);
}
catch (MessagingException ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
"be invoked with the incoming message", message.getPayload()), ex, amqpMessage);
}
catch (Exception ex) {
throw new ListenerExecutionFailedException("Listener method '" +
this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", ex, amqpMessage);
}
}


在这里可以看到catch了所有的异常,也就是说只要是我们消费消息的方法里面抛出来的异常全都会被包装成ListenerExecutionFailedException,并且这个Exception里面把消息也放进去了。

这里的this.handlerMethod其实就是上面提到的HandlerAdapter,跟踪它的invoke()方法,看它是怎么调用我们@RabbitListener注解的方法的。

最后我们跟踪到InvocableHandlerMethod类的下面这个方法:

1
2
3
4
5
6
7
8
9
10

@Nullable
protected Object doInvoke(Object... args) throws Exception {
ReflectionUtils.makeAccessible(getBridgedMethod());
try {
return getBridgedMethod().invoke(getBean(), args);
}
...
}

这里通过getBridgedMethod()方法拿到的就是@RabbitListener注解的方法了,这是在刚开始处理@RabbitListener注解时就已经保存下来的,然后就可以利用反射来调用这个方法,这样就完成了接收并处理消息的整个流程。

4.2 CachingConnectionFactory

因为上一个项目中使用了RabbitMQ,但是当时没有考虑过性能的问题,今天觉得好像不对劲,大量的重复建立连接,造成了很大的性能浪费,于是我就找呀找,发现Spring提供了一种RabbitMQ连接池,所以今天我们来看一下它是如何设计的。

  • CachingConnectionFactory

    CachingConnectionFactory为我们提供了两种缓存的模式:

    • CHANNEL模式:这也是CachingConnectionFactory的默认模式,在这种模式下,所有的createConnection()方法实际上返回的都是同一个Connection,同样的Connection.close()方法是没用的,因为就一个,默认情况下,Connection中只缓存了一个Channel,在并发量不大的时候这种模式是完全够用的,当并发量较高的时候,我们可以setChannelCacheSize()来增加Connection中缓存的Channel的数量。
    • CONNECTION模式:在CONNECTION模式下,每一次调用createConnection()方法都会新建一个或者从缓存中获取,根据你设置的ConnectionCacheSize的大小,当小于的时候会采用新建的策略,当大于等于的时候会采用从缓存中获取的策略,与CHANNEL模式不同的是,CONNECTION模式对Connection和Channel都进行了缓存,最新版本的client中已经将Channel的缓存数量从1增加到了25,但是在并发量不是特别大的情况下,作用并不是特别明显。
      使用CachingConnectionFactory需要注意的一点是:所有你获取的Channel对象必须要显式的关闭,所以finally中一定不要忘记释放资源,如果忘记释放,则可能造成连接池中没有资源可用
      好了,我们来看一下创建Connection源码的实现:
  • createConnection()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    synchronized (this.connectionMonitor) {
// CHANNEL模式下,这里的connection是ChannelCachingConnectionProxy 代理对象
//这样做的目的是为Channel提供临时的存储空间(也就是缓存Channel),以便其他客户端调用
if (this.cacheMode == CacheMode.CHANNEL) {
//确保Connection对象不为null,target是真实的连接
if (this.connection.target == null) {
//第一次调用 createConnection 方法时 connection.target 值为 null,因此会调用 createBareConnection 方法创建出 SimpleConnection 赋值给 connection.target
//SimpleConnection 中delegate属性是真正的RabbitMQ 连接(AMQConnection)
this.connection.target = super.createBareConnection();
// invoke the listener *after* this.connection is assigned
if (!this.checkoutPermits.containsKey(this.connection)) {
// Map<Connection, Semaphore> checkoutPermits 中存放了信道的许可数量,也就是默认的25,通过信号量来同步资源
this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
}
this.connection.closeNotified.set(false);
//向所有 ConnectionListener 发布 onCreate 事件
getConnectionListener().onCreate(this.connection);
}
return this.connection;
}
else if (this.cacheMode == CacheMode.CONNECTION) {
//直接从缓存中获取
return connectionFromCache();
}
}

创建Channel的源码实现:

  • createChannel()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
        Semaphore permits = null;
//大于0的情况下才会通过 Semaphore 限制当前连接下可用的信道数量
if (this.channelCheckoutTimeout > 0) {
//获取许可
permits = obtainPermits(connection);
}
//获取当前Connection的Channel代理集合
LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
ChannelProxy channel = null;
if (connection.isOpen()) {
//这里主要是从缓存中获取,在同步块中,先判断 channelList 是否为空,若不为空,则返回队列头部缓存的 ChannelProxy(要从队列中移除)。
//如果没有可用的缓存信道,则通过 getCachedChannelProxy 方法创建新的 ChannelProxy。
channel = findOpenChannel(channelList, channel);
if (channel != null) {
if (logger.isTraceEnabled()) {
logger.trace("Found cached Rabbit Channel: " + channel.toString());
}
}
}
if (channel == null) {
try {
//创建新Channel 的过程
channel = getCachedChannelProxy(connection, channelList, transactional);
}
catch (RuntimeException e) {
if (permits != null) {
permits.release();
if (logger.isDebugEnabled()) {
logger.debug("Could not get channel; released permit for " + connection + ", remaining:"
+ permits.availablePermits());
}
}
throw e;
}
}
return channel;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    private ChannelProxy getCachedChannelProxy(ChannelCachingConnectionProxy connection,
LinkedList<ChannelProxy> channelList, boolean transactional) { //NOSONAR LinkedList for addLast()
//通过Connection中delegate创建Channel对象
Channel targetChannel = createBareChannel(connection, transactional);
if (logger.isDebugEnabled()) {
logger.debug("Creating cached Rabbit Channel from " + targetChannel);
}
//向所有 ChannelListener 发布 onCreate 事件
getChannelListener().onCreate(targetChannel, transactional);
Class<?>[] interfaces;
//通过 Proxy.newProxyInstance创建一个实现了ChannelProxy接口的动态代理对象。
//所有对该实例的方法调用都会转交给CachedChannelInvocationHandler 的 invoke 方法处理
if (this.publisherConfirms || this.publisherReturns) {
interfaces = new Class<?>[] { ChannelProxy.class, PublisherCallbackChannel.class };
}
else {
interfaces = new Class<?>[] { ChannelProxy.class };
}
return (ChannelProxy) Proxy.newProxyInstance(ChannelProxy.class.getClassLoader(),
interfaces, new CachedChannelInvocationHandler(connection, targetChannel, channelList,
transactional));
}

五. 案例

5.1 自动重连

Spring AMQP提供了一些高级特性来解决协议错误或者代理失败发生时的恢复与自动重连:

  • 主要通过CachingConnectionFactory来实现;
  • 使用RabbitAdmin进行自动声明也很有用处;
  • 如果你比较在乎发送的质量,以也许会使用RabbitTemplate和SimpleMessageListenerContainer的channelTransacted属性,还有 SimpleMessageListenerContainer的Acknowledge.AUTO属性。

自动声明交换器、队列和绑定:

  • RabbitAdmin在启动的时候会声明exchange,queues和binding。它是延迟完成的,是在ConnectionListener中,所以如果在应用启动的时候消息代理不可用也没关系。
  • Connection被第一次使用的时候,监听器会被触发,admin这一特色会被应用。
  • 监听器中的自动声明的另外一个好处是如果连接由于某种原因断了(代理挂掉,或者网络失灵),下次连接时候它又会被启用。
  • 注意:以这种方式启动队列必须要有固定的名称。要么显示声明,要么通过框架为AnonymousQueues自动生成。 AnonymousQueues不可以持久化,专属的,自动删除。
  • 重点:自动声明只能在CachingConnectionFactory缓存模式是CHANNEL时候启用。这个限制的存在是因为专属的或者自动删除的队列和连接绑定。

同步操作失败和重试选项:

  • 如果在使用RabbitTemplate同步操作的过程中丢失了到代理的连接,Spring AMQP会抛出AmqpException(通常是AmqpIOException)。
  • 所以我们不会隐藏这一问题,所以你必须能够捕获并且响应这一异常。最简单处理方式是你认为连接断开,不是你这面的问题,重试这些操作。你可以手动的做这些,或者你可以使用Spring Retry来完成这些。
  • Spring Retry提供了一些AOP拦截器,重试的许多参数来简化重试。
  • Spring AMQP也提供了一些工厂来方便Spring Retry的创建。给你提供了强类型回调接口来实现定制的恢复逻辑。参看StatefulRetryOperationsInterceptor和StatelessRetryOperationsInterceptor获得更详细的说明。
  • 如果没有事务或者事务在重试回调中,无状态重试就可以。
  • 对于存在正在进行的事务或者要进行回滚,无状态重试并不适合。
  • 在事务之间断开连接和事务回滚有着同样的效果,在这种情况下,有状态重试是最佳选择。

从1.3版本开始,一个创建器API提供来用于装配拦截器:

1
2
3
4
5
6
7
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}

只有重试的一部分功能可以通过这种方式来配置,如果需要更为高级的功能,你可以通过RetryTemplate来完成。

报文监听器和异步场景:

  • 如果MessageListener由于业务而失败而抛出异常,那么这个异常将会被消息监听器容器处理,接着它会处理另外一条消息。
  • 如果这个错误是因为连接断开导致,那么接收消息的消费者必须必须取消并且重新启动。SimpleMessageListenerContainer无缝的处理了这些,并且记录了监听器被重新启动的日志。事实上,它会进行无限的反复尝试来启动这个消费者,如果消费者行为很糟糕否则它不会放弃。一个副作用是如果代理挂掉,它会不断尝试直到连接重新建立。
  • 业务异常的处理和协议错误,连接断开的处理不一样,需要更多的考虑或者配置,尤其是在事务或者容器应答被启用的时候。在RabbitMQ2.8.x版本以前,对于dead letter没有明确的定义,所以因为业务处理异常而导致的拒绝或者回滚将导致消息被无限的发送下去。为了限制客户端重新发送的数目,以种选择是在监听器上使用 StatefulRetryOperationsInterceptor拦截器增强。这个监听器有一个恢复回调,这个回调中实现dead letter的行为。
  • 另外一种处理方式,是设置rejectRqueued属性为false,这将导致所有的错误消息被丢弃。当RabbitMQ的版本是2.8.x或者是更高,这将使得消息转发到Dead Letter Exchange。
  • 或者你可以抛出AmqpRejectAndDontRequeueException,这样阻止消息被重新塞入队列,不管defaultRequeueRejected是否设置。

通常,两种技术可以结合使用。在增强链中使用StatefulRetryOperationsInterceptor,它的MessageRecover会抛出AmqpRejectAndDontRequeueException异常。如果尝试已经耗尽 MessageRecover将会被调用。默认的 MessageRecover简单的消费错误的消息,并且释放出警告信息。在这种情况下,消息得到了应答,但是不会被发送到Dead Letter Exchange。

从1.3版本开始,RepublishMessageRecoverer被提供在重试耗尽的情况下,来重新发布消息。

1
2
3
4
5
6
7
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.withMaxAttempts(5)
.setRecoverer(newRepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
.build();
}

重试异常分类:

  • Spring Retry拥有很大的灵活性来决定什么样的异常可以出发重试。默认的情况下,所有的异常都会出发重试。在用户异常被包装在ListenerExecutionFailedException里面的情况下,我们需要分类器检测到异常起因。默认情况下,分类器只查看顶级异常。
  • 自从Spring1.0.3开始,分类器BinaryExceptionClassifier有一个traverseCauses属性。当设置为true时,它将横向查找匹配的异常源。
  • 为了使用这个分类器进行重试,使用SimpleRetryPolicy,它的构造函数中有最大尝试次数,异常Map,还有 traverseCauses选项。你需要将这个策略注入到RetryTemplate中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Configuration
public class ConsumerRabbitConfig {

@Bean(name = "consumerConnectionFactory")
public CustomCachingConnectionFactory consumerConnectionFactory(@Value("${spring.rabbitmq.consumer.addresses}") String addresses,
@Value("${spring.rabbitmq.consumer.username}") String username,
@Value("${spring.rabbitmq.consumer.password}") String password,
@Value("${spring.rabbitmq.consumer.virtual-host}") String virtualHost) {
CustomCachingConnectionFactory connectionFactory = new CustomCachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setConnectionListeners(Arrays.asList(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
try {
logger.info("**********************创建rabbitmq链接:{}", connectionFactory.getReBindings().size());
Channel channel = connectionFactory.createConnection().createChannel(false);
for (CustomCachingConnectionFactory.ReBindingHandler reBinding : connectionFactory.getReBindings()) {
reBinding.reBinding(channel);
}
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}

@Override
public void onClose(Connection connection) {
logger.info("----------------rabbitmq链接断开");
}
}));
return connectionFactory;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CustomCachingConnectionFactory extends CachingConnectionFactory {

private List<ReBindingHandler> reBindings = new ArrayList<>();

public List<ReBindingHandler> getReBindings() {
return reBindings;
}

public CustomCachingConnectionFactory() {
super();
}

public void addReBindingHandler(ReBindingHandler reBindingHandler){
reBindings.add(reBindingHandler);
}

public interface ReBindingHandler {
/**
* 重新绑定队列,路由键绑定关系
* @param channel
*/
void reBinding(Channel channel);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@Configuration
public class Queue1ConsumerConfig {
/**
* 消费者最小个数
*/
@Value("${spring.rabbitmq.minnum.task.consumers}")
private int minNumConsumers;

/**
* 消费者最大个数
*/
@Value("${spring.rabbitmq.maxnum.task.consumers}")
private int maxNumConsumers;

/**
* 交换器名称
*/
public static final String EXCHANGE = "exchange";

/**
* 队列名称
*/
public static final String QUEUE = "queue1";

/**
* 队列路由键值
*/
public static final String ROUTING_KEY = "routing1";

@Bean(name = "queue1ConsumerRabbitTemplate")
public RabbitTemplate queue1ConsumerRabbitTemplate(
@Qualifier("consumerConnectionFactory") CustomCachingConnectionFactory connectionFactory) {
RabbitTemplate clientRabbitTemplate = new RabbitTemplate(connectionFactory);
Connection connection = null;
Channel channel = null;
try {
clientRabbitTemplate.setExchange(FUND_TASK_EXCHANGE);
connection = clientRabbitTemplate.getConnectionFactory().createConnection();
channel = connection.createChannel(false);
beding(channel);
connectionFactory.addReBindingHandler((newchannel) -> beding(newchannel));
logger.info("RabbitMQ连接成功(queue1): exchange:{} queue:{} routing:{}", EXCHANGE, ROUTING_KEY, ROUTING_KEY);
} catch (AmqpException e) {
logger.error("RabbitMQ连接异常(queue1)", e);
} finally {
try {
if (channel != null) {
channel.close();
}
} catch (IOException | TimeoutException e) {
logger.error("RabbitMQ关闭异常(queue1)", e);
}
if (connection != null) {
connection.close();
}
}
return clientRabbitTemplate;
}

private void beding(Channel channel) {
try {
channel.exchangeDeclare(EXCHANGE, "topic", false);
channel.queueDeclare(QUEUE, true, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
} catch (IOException e) {
logger.error("RabbitMQ操作错误(queue1)", e);
}
}

@Bean(name = "queue1ConsumerListenerContainerFactory")
public SimpleRabbitListenerContainerFactory queue1ConsumerListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("consumerConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(minNumConsumers);
factory.setMaxConcurrentConsumers(maxNumConsumers);
configurer.configure(factory, connectionFactory);
return factory;
}

@Bean
public Queue1Consumer queue1Consumer() {
return new Queue1Consumer();
}
}

5.2 解决消息幂等性问题

MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费。

5.2.1 RabbitMQ自动重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理?

  • 使用重试机制,RabbitMQ默认开启重试机制。

实现原理:

  • @RabbitHandler 注解底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务。
  • 如果Aop使用异常通知拦截获取到异常后,自动实现补偿机制,消息缓存在RabbitMQ服务器端。

注意:

  • 默认会一直重试到消费者不抛异常为止,这样显然不好。我们需要修改重试机制策略,如间隔3s重试一次)

配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring:
rabbitmq:
# 连接地址
host: 127.0.0.1
# 端口号
port: 5672
# 账号
username: guest
# 密码
password: guest
# 地址(类似于数据库的概念)
virtual-host: /admin_vhost
# 消费者监听相关配置
listener:
simple:
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000

5.2.2 如何合理选择重试机制?

  • 情况1:消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试?
    • 需要重试,可能是因为网络原因短暂不能访问。
  • 情况2:消费者获取到消息后,抛出数据转换异常,是否需要重试?
    • 不需要重试,因为属于程序bug,需要重新发布版本。

对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录 + 定时任务job进行健康检查 + 人工进行补偿

5.2.3 调用第三方接口自动实现补偿机制

我们知道RabbitMQ在消费者消费发生异常时,会自动进行补偿机制,所以我们(消费者)在调用第三方接口时,可以根据返回结果判断是否成功:

  • 成功:正常消费
  • 失败:手动抛处一个异常,这时RabbitMQ自动给我们做重试 (补偿)。

5.2.4 如何解决消费者幂等性问题,防止重复消费

  • 产生原因:网络延迟传输中,消费者出现异常或者消费者延迟消费,会造成进行MQ重试补偿,在重试过程中,可能会造成重复消费。
  • 解决方案:
    1. 使用全局MessageID判断消费者是否消费过,解决幂等性。
    2. 通过业务逻辑保证唯一,如订单编号。

生产者核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;

public void send(String queueName) {
String msg = "my_fanout_msg:" + System.currentTimeMillis();
//请求头设置消息id(messageId)
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, message);
}
}

消费者核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message) throws Exception {
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
//② 判断唯一Id是否被消费,消息消费成功后将id和状态保存在日志表中,我们从(①步骤)表中获取并判断messageId的状态即可
//从redis中获取messageId的value
String value = redisUtils.get(messageId)+"";
if(value.equals("1") ){ //表示已经消费
return; //结束
}
System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
// 获取email参数
String email = jsonObject.getString("email");
// 请求地址
String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
JSONObject result = HttpClientUtils.httpGet(emailUrl);
if (result == null) {
// 因为网络原因,造成无法访问,继续重试
throw new Exception("调用接口失败!");
}
System.out.println("执行结束....");
//① 执行到这里已经消费成功,我们可以修改messageId的状态,并存入日志表(可以存到redis中,key为消息Id、value为状态)
}

5.2.5 SpringBoot整合RabbitMQ应答模式(ACK)

修改配置simple下添加 acknowledge-mode: manual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
spring:
rabbitmq:
# 连接地址
host: 127.0.0.1
# 端口号
port: 5672
# 账号
username: guest
# 密码
password: guest
# 地址(类似于数据库的概念)
virtual-host: /admin_vhost
# 消费者监听相关配置
listener:
simple:
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
# 开启手动ack
acknowledge-mode: manual

消费者增加代码:

1
2
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动ack
channel.basicAck(deliveryTag, false); //手动签收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//邮件队列
@Component
public class FanoutEamilConsumer {
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.out
.println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties().getMessageId());
// 手动ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收
channel.basicAck(deliveryTag, false);
}
}

5.3 限流

应用场景:如电商系统的抢购,瞬间有巨大流量生成,通过消息队列先进先出的特性,对请求进行削峰,控制消费的速度(限流)避免系统被挤爆。

(1)通过配置文件实现

1
2
3
4
5
6
spring:
rabbitmq:
listener:
simple:
concurrency: 5
max-concurrency: 10

(2)@RabbitListener

利用 @RabbitListener 中的 concurrency 属性进行指定。

1
2
3
4
5
6
7
@Component
public class SpringBootMsqConsumer {
@RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10")
public void receive(Message message) {
System.out.println("receive message:" + new String(message.getBody()));
}
}

最小5个,最大10个消费者。

单个消费者如果收到消息过多也可能存在风险,可以通过设置预取 prefetch count 来控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class SpringBootMsqConsumer {
@RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10",containerFactory = "mqConsumerlistenerContainer")
public void receive(Message message) {
System.out.println("receive message:" + new String(message.getBody()));
}
}

@Configuration
public class RabbitMqConfig {

@Autowired
private CachingConnectionFactory connectionFactory;

@Bean(name = "mqConsumerlistenerContainer")
public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(50);
return factory;
}
}

配置成功后,consumer 单位时间内接收到消息就是50条。


参考:

🔗 《Spring AMQP 中文文档

🔗 《Spring整合rabbitmq实践(一):基础使用配置

🔗 《Spring整合rabbitmq实践(二):扩展功能

🔗 《RabbitMQ解决消息幂等性问题

🔗 《spring-rabbit消费过程解析及AcknowledgeMode选择

🔗 《RabbitMQ连接池——CachingConnectionFactory