RabbitMQ(十)扩展-消息追踪和负载均衡

RabbitMQ(十)扩展

一. 消息追踪

使用消息中间件过程难免会遇到消息丢失的情况:

  • 可能是生产者与Broker断开了连接并且也没有任何重试机制;
  • 可能是消费者在处理消息时发生了异常,但提前进行了ack;
  • 可能是交换器没有与任何队列进行绑定,生产者感知不到或未采取相应的措施;
  • 可能是RabbitMQ本身的集群策略导致消息的丢失。

需要有一个好的机制来跟踪记录消息的投递过程,方便开发和运维人员快速定位问题。

1.1 Firehose

Firehose可以记录每一次发送或消费消息的记录。原理是将生产者投递给MQ的消息或MQ投递给消费者的消息按指定的格式发送到默认交换器 amq.rabbitmq.trace 上(topic类型)。路由键为 publish.{exchangename}deliver.{queuename} 。前者对应生产者投递到交换器的消息,后者对应消费者从队列获取的消息。

开启/关闭Firehose:

1
2
$ rabbitmqctl trace_on [-p vhost]
$ rabbitmqctl trace_off [-p vhost]

Firehose(多少会影响一点性能)默认关闭,且重启MQ后重置为默认。

创建7个队列,2个交换器与其中两个队列绑定,最后将交换器 amq.rabbitmq.trace 与其余5个队列绑定:

分别用客户端向exchange和exchange.another发送一条消息”trace test payload”,然后再用客户端消费队列queue和queue.another的消息。

此时queue1中有2条消息,queue2中有2条消息,queue3中有4条消息,而queue4和queue5中只有一条消息。

  • 在向exchange发送一条消息后,amq.rabbitmq.trace 分别向queue1、queue3和queue4发送一条内部封装消息。
  • 在向exchange.another发送一条消息后,queue1和queue3多出一条消息。
  • 消费queue的时候,queue2、queue3和queue5多出一条消息。
  • 消费queue.another时,queue2和queue3多出一条消息。

综述:

  • publish.# 匹配发送到所有交换器的消息。
  • deliver.# 匹配消费所有队列的消息。
  • # 则包含前两者。

当有客户端发送或消费消息时,Firehose会自动封装相应的消息体,并添加详细的headers属性:

消费queue时,消息会封装为:

headers:

  • exchange_name:发送消息的交换器。
  • routing_keys:对应路由键列表。
  • properties:消息本身的属性,如delivery_mode为2表示消息需要持久化处理。

1.2 rabbitmq_tracing插件

rabbitmq_tracing插件相当于Firehose的GUI版本,同样能跟踪MQ中的消息流入流出,同样对消息进行封装并存入trace文件之中。

开启/关闭:

1
2
$ rabbitmq-plugins enable rabbitmq_tracing
$ rabbitmq-plugins disable rabbitmq_tracing

Web管理界面Admin会多出一栏Tracing:

添加Trace:

  • Name:trace任务名称。
  • Format:输出的消息日志格式,有Text和JSON。
  • Max payload bytes:每条消息的最大限制,单位为B,达到上限会被截断。
  • Pattern:设置匹配的模式,# 匹配所有消息流入流出,publish.# 匹配所有消息流入,deliver.# 匹配所有消息流出。

TEXT格式:

JSON格式:

添加完毕后,根据匹配的规则将相应的消息日志输出到对应日志文件中,默认路径为 /var/tmp/rabbitmq-tracing ,也可以在页面直接点击Trace log files直接查看对应文件。

添加两个trace任务:

对应文件:

会多出两个队列:

查看第一个队列,绑定的交换器就是 amq.rabbitmq.trace

可以看出 rabbitmq_tracing插件和Firehose如出一辙,只是多了层GUI方便使用和管理。

(1)开启Tracing日志

  1. 使用comp用户连接linux服务器

  2. 输入指令 rabbitmqctl status 确认MQ状态

    image-20210331171126692

    command not found:不能识别命令,需要添加配置信息

    1
    2
    3
    4
    5
    6
    vim .bash_profile

    export PATH=$PATH:/home/comp/rabbitmq/rabbitmq_server-3.7.5/sbin

    # 修改配置信息,后使其生效
    source .bash_profile
  3. 通过指令 rabbitmq-plugins list 查看MQ安装的插件,E* 表示已启用。对应文件路径:/home/comp/rabbitmq/rabbitmq_server-3.7.5/sbin

    image-20210331171303814

  4. 找到 rabbitmq_tracing 确认是否启用,若未启用,则通过指令 rabbitmq-plugins enable rabbitmq_tracing 开启。

    image-20210331171436966

  5. 通过指令 rabbitmqctl trace_on 开启trace。

  6. 虚拟主机 server 开启trace:rabbitmqctl trace_on -p server ,添加完成后会多一个交换器:

    image-20210331171813344

  7. RabbitMQ管理平台新建一个Trace,添加trace追踪文件信息:

    image-20210331172359303

  8. 创建的Trace文件:

    image-20210331172219234

    若该步骤出错,如:

    image-20210907173131624

    MQ的tracing插件默认使用guest用户,给它开下server权限应该就行了

    image-20210907183235365

    image-20210907183453592

    MQ日志路径:/home/comp/rabbitmq/log

    1
    2
    3
    4
    # 日志表示账号密码有问题
    2021-09-07 17:55:17.143 [info] <0.2303.0> Enabling tracing for vhost 'server'
    2021-09-07 17:55:17.173 [error] <0.3847.0> Supervisor {<0.3847.0>,rabbit_tracing_consumer_sup} had child consumer started with rabbit_tracing_consumer:start_link([{vhost,<<"server">>},{name,<<"server-trace-log">>},{format,<<"text">>},{pattern,<<"#">>},{<<"for...">>,...},...]) at undefined exit with reason no match of right hand value {error,{auth_failure,"Refused"}} in rabbit_tracing_consumer:init/1 line 58 in context start_error
    2021-09-07 17:55:17.173 [error] <0.3848.0> CRASH REPORT Process <0.3848.0> with 0 neighbours exited with reason: no match of right hand value {error,{auth_failure,"Refused"}} in rabbit_tracing_consumer:init/1 line 58 in gen_server:init_it/6 line 352
  9. 重新核算,触发消息发送。

  10. 查看步骤8的Trace文件,确认消息是否发送。

如果给guest增加权限server仍不能创建Trac,直接重建guest用户:

  1. 删除用户guest

    image-20210907194009362

  2. 再创建用户guest,用户名密码相同。

    image-20210907194048653

    image-20210907194122446

  3. 增加权限:

    image-20210907194148405

  4. 重新创建Tracing:

    image-20210907194213958

    创建成功:

    image-20210907194229913

(2)测试队列

除了开启Tracing排查消息是否投递到MQ,还可以新加一个测试Queue,绑定相同的交换器和路由键。因为无消费者,所以消息都堆积在队列中,通过GetMessage(n) + nack来获取所有消息,也可以ack或purge清空消息。

1.3 案例:可靠性检测

生产者将消息发送到交换器时,实际上是由生产者所连接的信道将消息上的路由键同交换器的绑定列表比较,之后再路由消息到相应的队列进程中。

那么在信道对比完绑定列表后,将消息路由到队列并保存的过程,是否会因为MQ的内部缺陷而引起偶发性的消息丢失?我们可以使用消息追踪机制来验证这一疑问,一个交换器通过同一个路由键绑定多个队列,生产者客户端采用同一个路由键发送消息到交换器,检测绑定的队列是否有消息丢失。

(1)前期准备

  1. 开启tracing插件。
  2. 创建一个交换器exchange和三个队列,都由同一个路由键rk绑定。
  3. 创建三个trace分别采用 #.queue1#.queue2#.queue3 的Pattern追踪各个队列。
  4. 创建一个trace:trace_publish采用 publish.exchange 追踪流入交换器的消息。

(2)验证过程

开启一个生产者现场,持续发消息到交换器,消息格式为当前时间戳+自增计数,从而在数据丢失时可以快速在trace日志中定位到。注意设置mandatory参数,防止消息路由不到对应队列而造成对消息丢失的误判。

消息发送前以[msg, QUEUE_NUM]的形式存入一个全局msgMap中,用来在消费端做数据验证,为3表示创建了三个队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
	
private static HashMap<String, Integer> msgMap = new HashMap<>();

private static BlockingQueue<String> log2disk = new LinkedBlockingDeque<>();

public static class ProducerThread implements Runnable {
private Connection connection;
public ProducerThread(Connection connection) {
this.connection = connection;
}

@Override
public void run() {
try {
Channel channel = connection.createChannel();
channel.addReturnListener((replyCode, replyText, exchange, routingKey, basicProperties, body) -> {
String errorInfo = "Basic.Return: " + new String(body) + "\n";
try {
log2disk.put(errorInfo);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(errorInfo);
});
int count = 0;
while (true) {
String message = System.currentTimeMillis() + "-" + count++;
// 存储消息一定要在发消息之前,否则消息被消费时msgMap中不一定会有相应的消息
synchronized (msgMap) {
msgMap.put(message, QUEUE_NUM);
}
channel.basicPublish(exchange, routingKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
try {
// QPS=10,可以自适应调节,不宜过高防止队列堆积严重
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

然后,开启三个消费者线程分别消费是三个队列的消息,从存储的msgMap中寻找是否有相应消息,若有则计数减一并删除;若无,则报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static class ConsumerThread implements Runnable {
private Connection connection;
private String queue;
public ConsumerThread(Connection connection, String queue) {
this.connection = connection;
this.queue = queue;
}

@Override
public void run() {
try {
final Channel channel = connection.createChannel();
channel.basicQos(64);
channel.basicConsume(this.queue, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
synchronized (msgMap) {
if (msgMap.containsKey(msg)) {
int count = msgMap.get(msg);
count--;
if (count > 0) {
msgMap.put(msg, count);
} else {
msgMap.remove(msg);
}
} else {
String errorInfo = "unknown msg : " + msg + "\n";
try {
log2disk.put(errorInfo);
System.out.println(errorInfo);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}

channel.basicAck(envelope.getDeliveryTag(), false);
}
});
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

最后,开启一个检测进程,每隔10分钟检测一下msgMap中的数据。对比消息中的时间戳和当前时间,如果差值超过10分钟,则说明可能有消息丢失。(该结论的前提是队列没有消息堆积,且消费消息的速度不低于生产消息的速度):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static class DetectThread implements Runnable {
@Override
public void run() {
while (true) {
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException ex) {
ex.printStackTrace();
}

synchronized (msgMap) {
long now = System.currentTimeMillis();
for (Map.Entry<String, Integer> entry : msgMap.entrySet()) {
String msg = entry.getKey();
if (now - parseTime(msg) >= 10 * 60 * 1000) {
String findLossInfo = "We find loss msg: " +
msg + " , now the time is: " +
now + ", and this msg still has " +
entry.getValue() + " missed\n";
try {
log2disk.put(findLossInfo);
System.out.println(findLossInfo);
msgMap.remove(msg);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}
}
}

public static Long parseTime(String msg) {
int index = msg.indexOf('-');
String timeStr = msg.substring(0, index);
return Long.parseLong(timeStr);
}

如果检测到msgMap有超过10分钟的未处理消息,还不能说明有消息丢失,使用trace,如果看到[msg, count]这条数据有如下情况:

  • 考虑count=3的情况。需要检索trace文件trace_publish.log来进一步验证,如果其中没搜到相应的消息说明消息未发送到交换器;如果搜索到消息,则要进一步检索trace1.log、trace2.log和trace3.log。如果3个文件不是全部都有此消息,则说明消息丢失。
  • 考虑0<count<3的情况。检索trace1.log、trace2.log和trace3.log。如果3个文件不是全部都有此消息,则说明消息丢失。
  • 考虑count=0的情况。说明检查程序异常,可以忽略。

主线程部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Connection connection = connectionFactory.newConnection();
// 用来读取log2disk这个BlockingQueue中存储的异常日志,然后进行存盘处理,可以用log4j等代替
PrintLogThread printLogThread = new PrintLogThread(logFileAddr);
ProducerThread producerThread = new ProducerThread(connection);
ConsumerThread consumerThread1 = new ConsumerThread(connection, "queue1");
ConsumerThread consumerThread2 = new ConsumerThread(connection, "queue2");
ConsumerThread consumerThread3 = new ConsumerThread(connection, "queue3");
DetectThread detectThread = new DetectThread();
System.out.println("starting check msg loss...");
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(printLogThread);
executorService.submit(producerThread);
executorService.submit(consumerThread1);
executorService.submit(consumerThread2);
executorService.submit(consumerThread3);
executorService.submit(detectThread);
executorService.shutdown();

二. 负载均衡

大量业务访问、高并发请求的场景,需要用高性能的服务器来提升MQ的负载能力。假设一个集群中有3个节点,所有客户端都与node1建立TCP连接,则node1的网络负载会大大增加,其他节点又由于没有那么多负载而造成资源浪费。

对于RabbitMQ,客户端只会和集群中一个节点建立连接而不是整个集群。引入负载均衡后,各个客户端的连接会分摊到各个节点:

负载均衡(Load Balance)是一种计算机网络技术,用于在多个计算机(集群)、网络连接、CPU、磁盘驱动器或其他资源中分配负载,以达到最佳资源使用、最大化吞吐率、最小响应时间及避免过载的目的。

使用带有负载均衡的多个服务器组件,取代单一的组件,可以通过冗余提高可靠性。

负载均衡分为:

  • 软件负载均衡:在一个或多个交互的网络系统中的多台服务器上安装一个或多个相应的负载均衡软件来实现的一种均衡负载技术。软件安装比较方便,技术配置简单,操作方便,成本很低。
  • 硬件负载均衡:多台服务器间安装相应的负载均衡设备,即负载均衡器。相比软件会有更好的负载均衡效果,但成本较高,适用于流量较大的大型网站系统。

2.1 客户端内部实现负载均衡

客户端连接可以任选一种负载均衡算法实现。

(1)轮询法

将请求按顺序轮流分配到后端服务器上,不关心每台服务器实际的连接数和当前系统负载,调用 RoundRobin.getConnectionAddress() 获取连接地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 轮询法
*/
public class RoundRobin {
private static List<String> list = new ArrayList<String>(){{
add("192.168.0.2");
add("192.168.0.3");
add("192.168.0.4");
}};

private static int pos = 0;
private static final Object lock = new Object();
public static String getConnectionAddress() {
String ip = null;
synchronized (lock) {
ip = list.get(pos);
if (++pos >= list.size()) {
pos = 0;
}
}
return ip;
}
}

(2)加权轮询法

不同服务器的配置可能和当前系统的负载不同,抗压能力也不相同。所以应该给配置高、负载低的机器配置更高的权重,让其处理更多请求;

(3)随机法

通过随机算法,根据服务器列表大小值来任选一台服务器访问。由概率理论可知,随着调用服务端次数增多,实际效果会越来越接近轮询法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 随机法
*/
public class RandomAccess {

private static List<String> list = new ArrayList<String>(){{
add("192.168.0.2");
add("192.168.0.3");
add("192.168.0.4");
}};

public static String getConnectionAddress() {
Random random = new Random();
int pos = random.nextInt(list.size());
return list.get(pos);
}
}

(4)加权随机法

与加权轮询法相同,根据机器配置和负载分配不同权重,区别是其按照权重随机分配而非顺序。

(5)源地址哈希法

根据获取的客户端IP地址,通过哈希函数计算得到的一个数值,用此数值对服务器列表的大小进行取模运算,得到的结果便是客户端要访问服务器的序号。

采用该方法,同一IP地址的客户端在服务器列表不变的情况下,每次都会分配给同一台服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 源地址哈希法
*/
public class IpHash {

private static List<String> list = new ArrayList<String>(){{
add("192.168.0.2");
add("192.168.0.3");
add("192.168.0.4");
}};

public static String getConnectionAddress() throws UnknownHostException {
int ipHashCode = InetAddress.getLocalHost().getHostAddress().hashCode();
int pos = ipHashCode & list.size();
return list.get(pos);
}
}

(6)最小连接数法

由于服务器配置不尽相同,对于请求的处理有快有慢,根据服务器当前连接情况,动态的选取其中当前积压连接数最少的一台服务器来处理当前的请求,尽可能的提高对服务器的利用效率。

2.2 使用HAProxy实现负载均衡

HAProxy提供了高可用性、负载均衡及基于TCP和HTTP应用的代理,支持虚拟主机,是一种免费、快速且可靠的解决方案。实现了一种事件驱动、单一进程模型,支持较大的并发连接数。

(1)安装

官网下载官方文档

将下载的压缩包复制到 /opt 目录下,与RabbitMQ同一个目录,并解压:

1
2
3
4
5
6
7
8
9
$ tar zxvf haproxy-1.7.8.tar.gz
$ cd haproxy-1.7.8
# make指令将其编译为可执行程序,TARGET选择目标平台
$ make TARGET=generic
# 编译后目录下有名为haproxy的可执行文件
# 修改系统配置
$ vi /etc/profile
export PATH=$PATH:/opt/haproxy-1.7.8/haproxy
$ source /etc/profile

(2)配置

HAProxy使用单一配置文件来定义所有属性:

  • HAProxy主机:192.168.0.9:5671
  • RabbitMQ1:192.168.0.2:5672
  • RabbitMQ2:192.168.0.3:5672
  • RabbitMQ3:192.168.0.4:5672
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 全局配置
global
# 日志输出配置,所有日志都记录在本机,通过local0输出
log 127.0.0.1 local0 info
# 最大连接数
maxconn 4096
# 改变当前的工作目录
chroot /opt/haproxy-1.7.8
# 以指定的UID运行
uid 99
# 以指定的GID运行
gid 99
# 以守护进程方式运行 #debug #quiet
daemon
#debug
# 当前进程pid文件
pidfile /opt/haproxy-1.7.8/haproxy.pid
# 默认配置
defaults
# 应用全局的日志配置
log global
# 默认的模式mode{tcp|http|health}
mode tcp
# 日志类别tcplog
option tcplog
# 不记录健康检查日志信息
option dontlognull
# 3次失败则认为服务不可用
retries 3
# 每个进程可用的最大连接数
maxconn 2000
# 连接超时
timeout connect 5s
# 客户端超时
timeout client 120s
# 服务端超时
timeout server 120s
# 绑定配置
listen rabbitmq_cluster :5671
# 配置TCP模式
mode tcp
# 简单的轮询
balance roundrobin
# RabbitMQ集群节点配置
server rmq_node1 192.168.0.2:5672 check inter 5000 rise 2 fall 3 weight 1
server rmq_node2 192.168.0.3:5672 check inter 5000 rise 2 fall 3 weight 1
server rmq_node3 192.168.0.4:5672 check inter 5000 rise 2 fall 3 weight 1
#haproxy监控页面地址
listen monitor :8100
mode http
option httplog
stats enabnle
stats uri /stats
stats refresh 5s

server rmq_node1 192.168.0.2:5672 check inter 5000 rise 2 fall 3 weight 1

  • server <name> :定义RabbitMQ服务的内部标识,此处rmq_node1指有含义的字符串名称,而非节点名称。
  • <ip>:<port> :定义RabbitMQ服务连接的IP地址和端口号。
  • check inter <value> :定义每隔多少毫秒检查RabbitMQ服务是否可用。
  • rise <value> :定义RabbitMQ服务在发生故障后,需要多少次健康检查才能被再次确认可用。
  • fall <value> :定义需要经历多少次失败的健康检查后,HAProxy才会停止使用此RabbitMQ服务。
  • weight <value> :定义当前RabbitMQ服务的权重。

调用 haproxy -f haproxy.cfg 命令运行服务后,可以查看相关界面 http://192.168.0.9:8100/stats

2.3 使用Keepalived实现高可靠负载均衡

如果HAProxy主机突然宕机或网卡失效,MQ集群没有故障,但所有外界客户端的连接都会断开(单点问题),HAProxy无法保证负载均衡服务的可靠性。

通过Keepalived能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移。

Keepalived采用VRRP(Vistual Router Redundancy Protocol,虚拟路由冗余协议),以软件的形式实现服务的热备功能。通常将两台Linux服务器组出一个热备组(Master和Backup),同一时间内热备组只有一个Master提供服务,Master会虚拟出一个公用的虚拟IP地址(VIP),只存在于Master上并对外提供服务。当Keepalived检测到Master宕机或服务故障,备份服务器自动接管VIP并成为Master,原Master从热备组中被移除待恢复后再自动加入到热备组,默认再抢占成Master起到故障转移的功能。

Keepalived工作在OSI的:

  • 第三层:定期向热备组中的服务器发送一个ICMP数据包来判断某台服务器是否故障,如果故障则从热备组移除。
  • 第四层:以TCP端口的状态判断服务器是否故障,比如检查MQ端口5672,如果故障则从热备组中移除。
  • 第七层:根据用户设定的策略(通常是一个自定义的检测脚本)判断服务器上的程序是否正常运行,如果故障则从热备组中移除。

(1)安装

官网下载

解压并安装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ tar zxvf keepalived-1.3.5.tar.gz
$ cd keepalived-1.3.5
# 根据系统选择对应的启动方式
$ ./configure --prefix=/opt/keepalived --with-init=SYSV
$ make
$ make install
# 将安装后的keepalived加入系统服务(注意千万不要输错命令)
$ cp /opt/keepalived/etc/rc.d/init.d/keepalived /etc/init.d/
$ cp /opt/keepalived/etc/sysconfig/keepalived /etc/sysconfig
$ cp /opt/keepalived/sbin/keepalived /usr/sbin/
$ chmod +x /etc/init.d/keepalived
$ chkconfig --add keepalived
$ chkconfig keepalived on
# keepalived会默认读取/etc/keepalived/keepalived.conf 配置文件
$ mkdir /etc/keepalived
$ cp /opt/keepalived/etc/keepalived/keepalived.conf /etc/keepalived
# 之后就可以重启、启动、关闭和查看keepalived状态
$ service keepalived restart
$ service keepalived start
$ service keepalived stop
$ service keepalived status

(2)配置

安装流程已创建了 /etc/keepalived 目录,并将 keepalived.conf 文件复制到此供keepalived读取。更改此文件使keepalived与HAProxy结合。

两台keepalived服务器通过VRRP交互,对外虚拟出VIP,keepalived与HAProxy部署在同一台机器上,一一配对,从而通过keepalived来实现HAProxy的双机热备。

调用链路:

  • 客户端通过VIP创建通信链路;
  • 通信链路通过keepalived的Master节点路由到对应的HAProxy上;
  • HAProxy通过负载均衡算法将负载分发到集群中各个节点上。

修改配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# keepalived配置文件
global_defs {
router_id NodeA # 路由ID、主/备的ID不能相同
}
# 自定义监控脚本
vrrp_script chk_haproxy {
script "/etc/keepalived/check_haproxy.sh"
interval 5
weight 2
}
vrrp_instance VI_1 {
state MASTER # keepalived的角色
interface eth0 # 指定监测网卡
virtual_router_id 1
priority 100 # 优先级,BACKUP机器上优先级要小于此值
advert_int 1 # 设置主备之间的检测时间,单位为s
authentication {
auth_type PASS
auth_pass root123
}
track_script {
chk_haproxy
}
virtual_ipaddress { # VIP地址,可以设置多个
192.168.0.10
}
}

BACKUP机器大致相同,需要修改 global_defs 的 router_id 为如NodeB;修改 vrrp_script chk_haproxy 的state为BACKUP;最后将priority设置为小于100的值。注意virtual_router_id要保持一致。

为了防止HAProxy挂掉后,keepalived还在正常工作而未切换到Backup,需要补充一个脚本来检测HAProxy的状态。当服务挂掉,脚本自动重启服务,若不成功则关闭keepalived服务,从而可以切换到Backup。该脚本对应 vrrp_script chk_haproxy 中的 script :

1
2
3
4
5
6
7
8
#!/bin/bash
if [ $(ps -C haproxy --no-header | wc -l) -eq 0];then
haproxy -f /opt/haproxy-1.7.8/haproxy.cfg
fi
sleep 2
if [ $(ps -C haproxy --no-header | wc -l) -eq 0];then
service keepalived stop
fi

如此配置后,使用 service keepalived start 启动两台服务。客户端通过VIP地址 192.168.0.10 来连接MQ服务。

(3)查看运行情况

查看Keepalived日志输出:

1
$ tail -f /var/log/messages -n 200

查看添加的VIP:

1
$ ip add show

一般情况下,双机热备已足够满足应用需求。

2.4 使用Keepalived+LVS实现负载均衡

LVS,Linux Virtual Server,即Linux虚拟服务器。LVS是4层负载均衡,建立在OSI模型的传输层之上。LVS支持TCP/UDP的负载均衡,相对于如DNS域名流转解析、应用层负载的调度、客户端的调度等更高效。

通过LVS可以实现高可伸缩的、高可用的网络服务。LVS主要由3部分组成:

  • 负载调度器(Load Balancer/Director):负责将客户请求发送到一组服务器上执行,客户则认为服务来自一个IP地址。
  • 服务器池(Server Pool / RealServer):一组真正执行客户端请求的服务器。
  • 共享存储(Shared Storage):为服务器池提供一个共享的存储区,方便其拥有相同内容,提供相同服务。

LVS的负载均衡方式:

  • VS/NAT:Virtual Server via Network Address Translation 的简称,最简单的方式,所有RealServer只需将自己的网关指向Director。客户端可以是任意的OS,但该方式下一个Director只能带动有限的RealServer。
  • VS/TUN:Virtual Server via Direct Routing 的简称,IP隧道(IP Tunneling)是将一个IP报文封装在另一个IP报文的技术,使目标为一个IP地址的数据报文能够封装和转发到另一个IP地址。IP隧道技术又叫IP封装技术(IP encapsulation)。
  • VS/DR:Virtual Server via Direct Routing 的简称,通过改写报文中的MAC地址部分来实现。Director和RealServer必须在物理上有一个网卡通过不间断的局域网相连。RealServer上绑定的VIP配置在各自Non-ARP网络设备上,Director的VIP地址对外可见;而RealServer则不可见,其地址既可以是内部地址,也可以是真实地址。

LVS可以代替HAProxy,不需要额外的配置文件,直接集成在Keepalived的配置中,修改 /etc/keepalived/keepalived.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# keepalived配置文件(Master)
global_defs {
router_id NodeA # 路由ID、主/备的ID不能相同
}
vrrp_instance VI_1 {
state MASTER # keepalived的角色
interface eth0 # 指定监测网卡
virtual_router_id 1
priority 100 # 优先级,BACKUP机器上优先级要小于此值
advert_int 1 # 设置主备之间的检测时间,单位为s
authentication {
auth_type PASS
auth_pass root123
}
track_script {
chk_haproxy
}
virtual_ipaddress { # VIP地址,可以设置多个
192.168.0.10
}
}

virtual_server 192.168.0.10 5672 { # 设置虚拟服务器
delay_loop 6 # 设置运行情况检查时间,单位为秒
# 设置负载调度算法,有rr、wrr、lc、wlc、lblc、lblcr、dh、sh共8种
lb_algo wrr # 加权轮询
lb_kind DR # 设置LVS实现的负载均衡机制为VS/DR
# 指定在一定的时间内来自统一IP的连接将会被转发到同一RealServer中
persistence_timeout 50
protocal TCP # 指定转发协议类型,包括TCP/UDP两种
# LVS三大部分之一,此处特指RabbitMQ服务
real_server 192.168.0.2 5672 { # 配置服务节点
weight 1 # 权重
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 5672
}
}
real_server 192.168.0.3 5672 {
weight 1
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 5672
}
}
real_server 192.168.0.4 5672 {
weight 1
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 5672
}
}
}

# 为RabbitMQ Management插件设置负载均衡
virtual_server 192.168.0.10 15672 {
delay_loop 6
lb_algo wrr
lb_kind DR
persistence_timeout 50
protocal TCP
real_server 192.168.0.2 5672 {
weight 1
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 15672
}
}
real_server 192.168.0.3 5672 {
weight 1
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 15672
}
}
real_server 192.168.0.4 5672 {
weight 1
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 15672
}
}
}

Backup配置参考上一节。

  • LVS主要工作是提供调度算法,把客户端请求按需求调度在RealServer中,
  • Keepalived主要工作是提供LVS控制器的一个冗余,并对RealServer进行健康检查,发现不健康的从集群剔除。
  • RealServer只负载提供服务。

VS/DR模式下需要在RealServer上配置VIP,因为LVS把客户端的包转发给RealServer时,因为包的目的IP地址是VIP,在RealServer收到包后发现目的地址不是自己系统的IP会认为其不是发给自己的,就会丢弃此包,需要把这个IP地址绑定到网卡下。当发送应答包给客户端,RealServer就会把包的源和目的地址调换,直接恢复给客户端。

为所有RealServer的lo:0网卡创建启动脚本(vim /opt/realserver.sh)绑定VIP地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/bin/bash
VIP=192.168.0.10
/etc/rc.d/init.d/functions

case "$1" in
start)
/sbin/ipconfig lo:0 $VIP netmask 255.255.255.255 broadcast $VIP
/sbin/route add -host $VIP dev lo:0
echo "1" >/proc/sys/net/ipv4/conf/lo/arp_ignore
echo "2" >/proc/sys/net/ipv4/conf/lo/arp_announce
echo "1" >/proc/sys/net/ipv4/conf/all/arp_ignore
echo "2" >/proc/sys/net/ipv4/conf/all/arp_announce
sysctl -p >/dev/null 2>&1
echo "RealServer Start Ok"
;;
stop)
/sbin/ipconfig lo:0 down
/sbin/route del -host $VIP dev lo:0
echo "0" >/proc/sys/net/ipv4/conf/lo/arp_ignore
echo "0" >/proc/sys/net/ipv4/conf/lo/arp_announce
echo "0" >/proc/sys/net/ipv4/conf/all/arp_ignore
echo "0" >/proc/sys/net/ipv4/conf/all/arp_announce
;;
status)
islothere=`/sbin/ipconfig lo:0 | grep $VIP | wc -l`
isrothere=`netstat -rn | grep "lo:0" | grep $VIP | wc -l`
if [ $islothere -eq 0]
then
if [ $isrothere -eq 0]
then
echo "LVS of RealServer Stoped."
else
echo "LVS of RealServer Running."
fi
else
echo "LVS of RealServer Running."
fi
;;
*)
echo "Usage:$0{start|stop}"
exit 1
;;
esac

掩码为 255.255.255.255 表示广播地址是自身,不会将ARP发送到实际自己该属于的广播域,防止与LVS的VIP冲突而导致IP地址冲突。

/opt/realserver.sh 文件添加可执行权限后,运行 /opt/realserver.sh start 命令后可以通过 ip add show 命令查看网卡lo:0的状态,注意与Keepalived节点的网卡状态进行区分。


参考:

🔗 《RabbitMQ实战指南》