RabbitMQ(一)简介和入门

RabbitMQ(一)简介和入门

一. 概述

1.1 什么是消息中间件

消息指在应用间传送的数据。可以只包含字符串、JSON等,也可以包括内嵌对象。

消息队列中间件(Message Queue Middleware,即MQ)指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过消息传递和消息排队模型,可以在分布式环境下扩展进程间通信

两种传递模式:

  • 点对点模式(P2P):基于队列,消息生产者发送消息到队列,消息消费者从队列接受消息。队列使消息的异步传输成为可能。
  • 发布/订阅模式(Pub/Sub):订阅了如何向一个内容节点发布和订阅消息,内容节点被称为主题(topic),可以看作是消息传递的中介,消息发布者将消息发布到某个主题,订阅者从主题订阅消息。主题使发布者和订阅者相互独立,可以在消息的一对多广播时采用。

消息中间件提供了基于存储和转发的应用程序之间的异步数据发生,即应用程序间不直接通信,而是作为中介的消息中间件通信。消息中间件封装了远程过程调用(RPC)和网络通信协议的细节。

1.2 常见的消息中间件

常见的几款开源消息中间件:

  • RabbitMQ:基于AMQP协议,主流消息中间件之一,适合数据量没那么大的项目;
  • Kafka:诞生于LinkedIn公司,吞吐量大,可用性高,常用于大数据领域。
  • ActiveMQ:Apache老牌消息引擎;
  • RocketMQ:阿里巴巴开源产品,Java实现,参考了Kafka的设计,可用性和吞吐量很高,适合于电商和金融互联网场景。
  • Apollo:Apache的ActiveMQ子项目。

1.3 消息中间件的作用

  • 解耦:只要遵守同样的接口约束,我们可以独立的扩展和修改通信两端的处理过程。
  • 冗余(存储):消息中间件可以把数据进行持久化直到被完全处理,可以规避数据丢失的风险。即消息在被从中间件删除前,需要处理系统明确的指出此消息已被处理完成。
  • 扩展性:因为解耦了应用的处理过程,所以提高消息的入队和处理效率很容易,只须增加处理过程,不需改动代码也不需调节参数。
  • 削峰:访问量剧增时,这种情况不一定是常态,如果以此为标准投入资源无疑是巨大的浪费。消息中间件可以使关键组件支撑突发访问压力,不会因为突发的超负荷请求而崩溃。
  • 可恢复性:消息中间件降低了进程间的耦合度,即使一个处理消息的进程挂掉,消息仍可以等到系统恢复后进行处理。
  • 顺序保证:大部分消息中间件支持一定程度上的数据处理的顺序性。
  • 缓冲:消息中间件通过一个缓冲层帮助任务以最高效率执行,有助于控制和优化数据流经过系统的速度。
  • 异步通信:有时应用不想也不需要立即处理消息,消息中间件提供了异步处理机制,应用可以放入一些消息但不立即处理。

1.4 RabbitMQ的发展历程

RabbitMQ基于 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,起源于金融系统,用于在分布式系统中存储转发消息。

之前,商业的消息中间件如微软的MSMQ、IBM的WebSphere等没有创建标准来实现MQ产品的互通或允许应用自定义MQ平台。JMS(Java Message Service)应运而生,试图通过提供公共Java API的方式隐藏单独MQ产品的实际接口,来解决互通问题ActiveMQ就是基于JMS的一种实现,但这种使用单独标准化接口来胶合不同的接口最终还是会暴露问题,所以业内需要一种新的消息通信标准化方案。

2006年6月,Cisco、Redhat、iMatix等联合制定了AMQP标准,它是应用层协议的一个开放标准,来解决众多消息中间件的需求和拓扑结构问题。面向消息的中间件设计,基于此协议的客户端和消息中间件可以传递消息,不受产品、开发语言的限制。

RabbitMQ早期即实现了AMQP的一个特性:使用协议本身就可以对队列和交换器这样的资源进行配置。而商业MQ进行资源配置需要管理终端的特定工具。

1.5 RabbitMQ的特点

  • 可靠性:通过如持久化、传输确认及发布确认等保障可靠性。
  • 灵活路由:消息进入队列前,通过交换器来路由消息。基本的路由功能由内置的交换器来实现,复杂的路由功能可以绑定多个路由器实现,也可以通过插件机制实现自定义的交换器。
  • 扩展性:多个RabbitMQ节点可以组成集群,可以根据实际业务情况动态扩展节点。
  • 高可用性:队列可以在集群的机器上设置镜像,在部分节点出现问题时仍能使用队列。
  • 多种协议:除了原生AMQP协议,还支持STOMP、MQTT等多种消息中间件协议。
  • 多语言客户端:支持几乎所有常用语言,如Java、Python、Ruby、PHP、C#、JavaScript等。
  • 管理界面:提供了一个易用的用户界面,可以监控和管理消息、集群中的节点等。
  • 插件机制:提供了许多插件,可以进行扩展,也可以自定义插件。

二. RabbitMQ的安装和简单使用

2.1 安装

首先要安装Erlang,官网地址:Erlang-Download 。以下基于Linux操作系统安装。

(1)安装Erlang

创建安装目录 /opt/erlang ,并下载安装包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@iZ2zeet6kto8eqx1w7sluzZ ~]# cd /opt
[root@iZ2zeet6kto8eqx1w7sluzZ opt]# ls
containerd gitlab
[root@iZ2zeet6kto8eqx1w7sluzZ opt]# mkdir /erlang
[root@iZ2zeet6kto8eqx1w7sluzZ opt]# cd /erlang
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# wget http://erlang.org/download/otp_src_23.0.tar.gz
--2020-09-21 16:20:16-- http://erlang.org/download/otp_src_23.0.tar.gz
Resolving erlang.org (erlang.org)... 192.121.151.106
Connecting to erlang.org (erlang.org)|192.121.151.106|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 88865562 (85M) [application/gzip]
Saving to: ‘otp_src_23.0.tar.gz’

2% [=> ] 1,907,973 6.64KB/s eta 2h 40m

官网下载速度太慢,访问 RabbitMQ - Erlang 选择对应服务器,复制下载地址:

先删除未下载完成的文件,重新下载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# ls
otp_src_23.0.tar.gz
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# rm -f otp_src_23.0.tar.gz
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# ls
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_23.0.3-1~centos~7_amd64.rpm
--2020-09-21 16:37:11-- https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_23.0.3-1~centos~7_amd64.rpm
Resolving packages.erlang-solutions.com (packages.erlang-solutions.com)... 13.227.21.48, 13.227.21.118, 13.227.21.90, ...
Connecting to packages.erlang-solutions.com (packages.erlang-solutions.com)|13.227.21.48|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 43642720 (42M) [application/x-rpm]
Saving to: ‘esl-erlang_23.0.3-1~centos~7_amd64.rpm’

100%[====================================================================================================================>] 43,642,720 23.1KB/s in 24m 34s

2020-09-21 17:01:49 (28.9 KB/s) - ‘esl-erlang_23.0.3-1~centos~7_amd64.rpm’ saved [43642720/43642720]

通过 yum install 安装程序:(会安装到默认目录)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# ls
esl-erlang_23.0.3-1~centos~7_amd64.rpm
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# yum install esl-erlang_23.0.3-1~centos~7_amd64.rpm
Loaded plugins: fastestmirror
Examining esl-erlang_23.0.3-1~centos~7_amd64.rpm: esl-erlang-23.0.3-1.x86_64
Marking esl-erlang_23.0.3-1~centos~7_amd64.rpm to be installed
Resolving Dependencies
--> Running transaction check
---> Package esl-erlang.x86_64 0:23.0.3-1 will be installed
--> Processing Dependency: libodbc.so.2()(64bit) for package: esl-erlang-23.0.3-1.x86_64
Determining fastest mirrors
base | 3.6 kB 00:00:00
docker-ce-stable | 3.5 kB 00:00:00
epel | 4.7 kB 00:00:00
......
gitlab_gitlab-ee/x86_64/primary | 2.6 MB 00:00:06
gitlab_gitlab-ee 623/623

......

Dependencies Resolved

......

Total size: 107 M
Total download size: 462 k
Installed size: 107 M
Is this ok [y/d/N]: y

......

Complete!

# 通过rpm查看软件是否安装
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# rpm -qa | grep erlang
esl-erlang-23.0.3-1.x86_64
# 查看软件安装目录
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# rpm -ql esl-erlang-23.0.3-1.x86_64
/usr/bin/cpu_sup
/usr/bin/ct_run
/usr/bin/dialyzer
/usr/bin/epmd
/usr/bin/erl
/usr/bin/erl_call
/usr/bin/erlc
/usr/bin/escript
/usr/bin/etop
/usr/bin/getop
/usr/bin/heart
/usr/bin/memsup
/usr/bin/run_erl
/usr/bin/run_test
/usr/bin/start_erl
/usr/bin/start_webtool
/usr/bin/to_erl
/usr/bin/typer
/usr/lib/erlang/COPYRIGHT
/usr/lib/erlang/Install
# 或者直接查看目录
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# rpm -qal | grep erlang

若出现报错:No curses library functions found 。需要安装 ncurses:

1
yum install ncurses-devel

若继续出现如 No XXXXX found 的提示,可以自行安装对应包,之后再次尝试安装Erlang直到提示安装完毕。

修改 /etc/profile 配置文件,添加如下环境变量:

1
2
3
4
5
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# cd /
[root@iZ2zeet6kto8eqx1w7sluzZ /]# vi /etc/profile

export ERLANG_HOME=/usr/lib/erlang
export PATH=$PATH:/usr/bin

修改完,执行如下命令使配置文件生效:

1
[root@iZ2zeet6kto8eqx1w7sluzZ /]# source /etc/profile

通过 erl 命令验证是否安装成功:

1
2
[root@iZ2zeet6kto8eqx1w7sluzZ erlang]# erl -version
Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version 11.0.3

(2)安装RabbitMQ

首先下载安装包,官网地址:RabbitMQ

然后可以将RabbitMQ和Erlang安装在同一目录(/opt):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
--2020-09-21 17:39:13-- https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
Resolving www.rabbitmq.com (www.rabbitmq.com)... 104.20.11.224, 172.67.16.25, 104.20.10.224, ...
Connecting to www.rabbitmq.com (www.rabbitmq.com)|104.20.11.224|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5075721 (4.8M) [application/x-redhat-package-manager]
Saving to: ‘rabbitmq-server-3.6.15-1.el7.noarch.rpm’

100%[====================================================================================================================>] 5,075,721 1.73MB/s in 2.8s

2020-09-21 17:39:18 (1.73 MB/s) - ‘rabbitmq-server-3.6.15-1.el7.noarch.rpm’ saved [5075721/5075721]

......

[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# ls
rabbitmq-server-3.6.15-1.el7.noarch.rpm
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# yum install rabbitmq-server-3.6.15-1.el7.noarch.rpm

......

Complete!

# 通过rpm查看软件是否安装
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# rpm -qa | grep rabbitmq
rabbitmq-server-3.6.15-1.el7.noarch
# 查看软件安装目录
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# rpm -ql rabbitmq-server-3.6.15-1.el7.noarch
/etc/logrotate.d/rabbitmq-server
/etc/rabbitmq
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha
......
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin/rabbitmq-plugins
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin/rabbitmq-server
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/sbin/rabbitmqctl
/usr/lib/systemd/system/rabbitmq-server.service
/usr/lib/tmpfiles.d/rabbitmq-server.conf
/usr/sbin/rabbitmq-plugins
/usr/sbin/rabbitmq-server
/usr/sbin/rabbitmqctl
......

同样修改 /etc/profile 文件,添加如下环境变量:

1
2
3
4
[root@iZ2zeet6kto8eqx1w7sluzZ /]# vi /etc/profile

export RABBITMQ_HOME=/usr/lib/rabbitmq
export PATH=$PATH:/usr/sbin

同样使配置文件生效:

1
[root@iZ2zeet6kto8eqx1w7sluzZ /]# source /etc/profile

(3)卸载

卸载前先停掉rabbitmq服务,执行命令

1
$ service rabbitmq-server stop

查看rabbitmq安装的相关列表

1
$ yum list | grep rabbitmq

卸载rabbitmq已安装的相关内容

1
$ yum -y remove rabbitmq-server.noarch

查看erlang安装的相关列表

1
$ yum list | grep erlang

卸载erlang已安装的相关内容

1
2
$ yum -y remove erlang-*
$ yum -y remove erlang.x86_64

2.2 运行

任意开启Shell,运行RabbitMQ:

1
2
3
4
5
6
# detached参数使RabbitMQ以守护进程的方式在后台运行,不会因为Shell的关闭而影响服务
rabbitmq-server -detached
# 查看RabbitMQ是否正常启动
rabbitmqctl status
# 查看集群信息,当前只有一个节点
rabbitmqctl cluster_status

运行失败:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# rabbitmq-server -detached
Warning: PID file not written; -detached was passed.
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# rabbitmqctl status
Status of node rabbit@iZ2zeet6kto8eqx1w7sluzZ
Error: unable to connect to node rabbit@iZ2zeet6kto8eqx1w7sluzZ: nodedown

DIAGNOSTICS
===========

attempted to contact: [rabbit@iZ2zeet6kto8eqx1w7sluzZ]

rabbit@iZ2zeet6kto8eqx1w7sluzZ:
* connected to epmd (port 4369) on iZ2zeet6kto8eqx1w7sluzZ
* epmd reports: node 'rabbit' not running at all
no other nodes on iZ2zeet6kto8eqx1w7sluzZ
* suggestion: start the node

current node details:
- node name: 'rabbitmq-cli-57@iZ2zeet6kto8eqx1w7sluzZ'
- home dir: /var/lib/rabbitmq
- cookie hash: 63rISb5yZX97fhoF8LdjSg==

首先排除是否是防火墙的问题:(不是这个原因)

1
2
3
4
5
6
7
# 查看防火墙开放端口
[root@iZ2zeet6kto8eqx1w7sluzZ /]# firewall-cmd --zone=public --list-ports
8025/tcp 6725/tcp
[root@iZ2zeet6kto8eqx1w7sluzZ /]# systemctl stop firewalld.service
[root@iZ2zeet6kto8eqx1w7sluzZ /]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
Job for rabbitmq-server.service failed because the control process exited with error code. See "systemctl status rabbitmq-server.service" and "journalctl -xe" for details.

删除 /var/lib/rabbitmq/mnesia 目录下的文件,再重启:(不是这个原因)

1
2
3
4
5
6
7
[root@iZ2zeet6kto8eqx1w7sluzZ /]# cd /var/lib/rabbitmq/mnesia
[root@iZ2zeet6kto8eqx1w7sluzZ mnesia]# ls -AF
rabbit@iZ2zeet6kto8eqx1w7sluzZ.pid
[root@iZ2zeet6kto8eqx1w7sluzZ mnesia]# rm -rf rabbit@iZ2zeet6kto8eqx1w7sluzZ.pid
[root@iZ2zeet6kto8eqx1w7sluzZ mnesia]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
Job for rabbitmq-server.service failed because the control process exited with error code. See "systemctl status rabbitmq-server.service" and "journalctl -xe" for details.

查看详细错误信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
[root@iZ2zeet6kto8eqx1w7sluzZ /]# systemctl status rabbitmq-server.service
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: activating (auto-restart) (Result: exit-code) since Tue 2020-09-22 10:19:51 CST; 3s ago
Process: 27630 ExecStop=/bin/sh -c while ps -p $MAINPID >/dev/null 2>&1; do sleep 1; done (code=exited, status=0/SUCCESS)
Process: 27488 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
# rabbitmq-server启动失败
Process: 27187 ExecStart=/usr/sbin/rabbitmq-server (code=exited, status=1/FAILURE)
Main PID: 27187 (code=exited, status=1/FAILURE)

Sep 22 10:19:51 iZ2zeet6kto8eqx1w7sluzZ systemd[1]: Failed to start RabbitMQ broker.
Sep 22 10:19:51 iZ2zeet6kto8eqx1w7sluzZ systemd[1]: Unit rabbitmq-server.service entered failed state.
Sep 22 10:19:51 iZ2zeet6kto8eqx1w7sluzZ systemd[1]: rabbitmq-server.service failed.


[root@iZ2zeet6kto8eqx1w7sluzZ mnesia]# journalctl -xe
Sep 22 10:26:04 iZ2zeet6kto8eqx1w7sluzZ rabbitmq-server[8731]: /var/log/rabbitmq/rabbit@iZ2zeet6kto8eqx1w7sluzZ-sasl.log
Sep 22 10:26:04 iZ2zeet6kto8eqx1w7sluzZ rabbitmq-server[8731]: Stack trace:
Sep 22 10:26:04 iZ2zeet6kto8eqx1w7sluzZ rabbitmq-server[8731]: []
Sep 22 10:26:05 iZ2zeet6kto8eqx1w7sluzZ rabbitmq-server[8731]: {"init terminating in do_boot",noproc}
Sep 22 10:26:05 iZ2zeet6kto8eqx1w7sluzZ rabbitmq-server[8731]: init terminating in do_boot (noproc)
Sep 22 10:26:05 iZ2zeet6kto8eqx1w7sluzZ rabbitmq-server[8731]: Crash dump is being written to: erl_crash.dump...done
Sep 22 10:26:05 iZ2zeet6kto8eqx1w7sluzZ systemd[1]: rabbitmq-server.service: main process exited, code=exited, status=1/FAILURE
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: Stopping and halting node rabbit@iZ2zeet6kto8eqx1w7sluzZ
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: Error: unable to connect to node rabbit@iZ2zeet6kto8eqx1w7sluzZ: nodedown
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: DIAGNOSTICS
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: ===========
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: attempted to contact: [rabbit@iZ2zeet6kto8eqx1w7sluzZ]
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: rabbit@iZ2zeet6kto8eqx1w7sluzZ:
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: * connected to epmd (port 4369) on iZ2zeet6kto8eqx1w7sluzZ
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: * epmd reports: node 'rabbit' not running at all
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: no other nodes on iZ2zeet6kto8eqx1w7sluzZ
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: * suggestion: start the node
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: current node details:
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: - node name: 'rabbitmq-cli-55@iZ2zeet6kto8eqx1w7sluzZ'
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: - home dir: .
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ rabbitmqctl[9029]: - cookie hash: 63rISb5yZX97fhoF8LdjSg==
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ systemd[1]: Failed to start RabbitMQ broker.
-- Subject: Unit rabbitmq-server.service has failed
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
--
-- Unit rabbitmq-server.service has failed.
--
-- The result is failed.
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ systemd[1]: Unit rabbitmq-server.service entered failed state.
Sep 22 10:26:06 iZ2zeet6kto8eqx1w7sluzZ systemd[1]: rabbitmq-server.service failed.
lines 1389-1419/1419 (END)

# 尝试重启
[root@iZ2zeet6kto8eqx1w7sluzZ sbin]# rabbitmq-server restart


BOOT FAILED
===========

Error description:
noproc

Log files (may contain more information):
/var/log/rabbitmq/rabbit@iZ2zeet6kto8eqx1w7sluzZ.log
/var/log/rabbitmq/rabbit@iZ2zeet6kto8eqx1w7sluzZ-sasl.log

Stack trace:
[]

=INFO REPORT==== 22-Sep-2020::20:41:05.655469 ===
Error description:
noproc

Log files (may contain more information):
/var/log/rabbitmq/rabbit@iZ2zeet6kto8eqx1w7sluzZ.log
/var/log/rabbitmq/rabbit@iZ2zeet6kto8eqx1w7sluzZ-sasl.log

Stack trace:
[]


{"init terminating in do_boot",noproc}
init terminating in do_boot (noproc)

Crash dump is being written to: erl_crash.dump...done

根据提示 noproc 考虑是否是版本不匹配,rabbitmq - erlang 官网查询确认是否匹配:

  • Erlang:23.0.3
  • RabbitMQ:3.6.15

卸载当前RabbitMQ,再重新下载一个3.8.8版本的RabbitMQ:

1
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.8/rabbitmq-server-3.8.8-1.el7.noarch.rpm

阿里云服务器下载GitHub实在是太慢了,选择用Docker拉取镜像:

1
2
3
4
5
6
7
[root@iZ2zeet6kto8eqx1w7sluzZ ~]# service docker restart
Redirecting to /bin/systemctl restart docker.service
[root@iZ2zeet6kto8eqx1w7sluzZ ~]# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Unable to find image 'rabbitmq:3-management' locally
3-management: Pulling from library/rabbitmq
5d9821c94847: Downloading [=======================> ] 12.53MB/26.7MB
......

仍然很慢,还是直接下载RPM文件,加上断点:

1
wget -c https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.8/rabbitmq-server-3.8.8-1.el7.noarch.rpm

下载成功后,重新安装:

1
2
3
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# ls -AF
rabbitmq-server-3.8.8-1.el7.noarch.rpm
[root@iZ2zeet6kto8eqx1w7sluzZ rabbitmq]# yum install rabbitmq-server-3.8.8-1.el7.noarch.rpm

重新启动:(终于成功)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@iZ2zeet6kto8eqx1w7sluzZ ~]# rabbitmq-server -detached
[root@iZ2zeet6kto8eqx1w7sluzZ ~]# rabbitmqctl status
Status of node rabbit@iZ2zeet6kto8eqx1w7sluzZ ...
Runtime

OS PID: 4074
OS: Linux
Uptime (seconds): 89
Is under maintenance?: false
RabbitMQ version: 3.8.8
Node name: rabbit@iZ2zeet6kto8eqx1w7sluzZ
Erlang configuration: Erlang/OTP 23 [erts-11.0.3] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:64] [hipe]
Erlang processes: 275 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60
......

# 查看集群状态
[root@iZ2zeet6kto8eqx1w7sluzZ ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@iZ2zeet6kto8eqx1w7sluzZ ...
......

2.3 生产和消费消息

RabbitMQ的默认用户名和密码都是 guest,此账户只能本地访问,远程网络访问会受限,所以我们要首先添加一个用户,并设置访问权限

1
2
3
4
5
6
# 新增用户
rabbitmqctl add_user root root
# 设置所有权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
# 设置管理员角色
rabbitmqctl set_user_tags root administrator

若在使用RabbitMQ的过程中有 com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN 类似报错,很可能就是账户的问题。

1
2
3
4
Exception in thread "main" com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured
java.net.SocketException: Socket Closed

在Java项目中引入RabbitMQ客户端的Maven依赖:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>

创建消息生产者,发送一条消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "xxx.xxx.xxx.xxx";
private static final int PORT = 5672;//RabbitMQ 服务端默认端口号为 5672

public static void main(String[] args) throws IOException,
TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection(); //创建连接
Channel channel = connection.createChannel(); //创建信道
// 创建一个 type direct 、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//创建一个持久化、非排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//发送一条持久化的消息 hello world !
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
//关闭资源
channel.close();
connection.close();
}
}

然后是消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class RabbitConsumer {
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "xxx.xxx.xxx.xxx";
private static final int PORT = 5672;

public static void main(String[] args) throws IOException,
TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root");

//这里的连接方式与生产者的 demo 略有不同,注意辨别区别
Connection connection = factory.newConnection(addresses); //创建连接
final Channel channel = connection.createChannel(); //创建信道
channel.basicQos(64); //设置客户端最多接收未被 ack 的消息的个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("recv message : " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
//等待回调函数执行完毕之后 关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}

控制台打印:

1
recv message : Hello World!

此处采用继承 DefaultConsumer 的方式来实现消费,也可以使用 QueueingConsumer 来实现,但会有些隐患,并且这种实现已被废弃。


三. Rabbit入门

RabbitMQ的模型架构是什么?AMQP协议是什么?两者间又有何种联系?消息从生产者发出到消费者消费的过程要经历什么?

3.1 RabbitMQ基本概念

(1)生产者和消费者模型

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ就好比由邮局、邮箱和邮递员组成的一个系统。

从计算机术语层面来说, RabbitMQ模型更像是一种交换机模型

消息一般包含2个部分:

  • 消息体:消息体也可以称之为 payload ,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。当然可以进一步对这个消息体进行序列化操作。
  • 标签(Label):用来表述这条消息,比如一个交换器的名称和一个路由键。

大致流程:

  • 生产者(Producer)将业务数据封装(序列化)成消息(指定Exchange和RoutingKey等),把消息交由RabbitMQ,即发送给Broker (AMQP协议中对应命令为Basic.Publish)。
  • Broker (消息中间件的服务节点),大多数情况下可以将 RabbitMQ Broker 看作一台 RabbitMQ 服务器。RabbitMQ之后会根据标签把消息发送给感兴趣的消费者(Consumer)
  • 消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体( payload )。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
  • 消费者受到消息体后(AMQP协议中对应命令为Basic.Consume或Basic.Get)进行反序列化,得到对应的业务数据,并进行业务处理。

消费者进行业务处理可以不和接收消息的逻辑使用同一线程,比如使用一个线程去接收消息存入内存(BlockingQueue),使用另一个线程从内存读取数据,这样可以进一步解耦,提高整体处理效率。

(2)队列

队列(Queue)是 RabbitMQ 的内部对象,用来存储消息消息只能存储在队列中,这与Kafka这种消息中间件相反(Kafka消息存储在主题topic逻辑层面),相对应的队列逻辑只是主题实际存储文件中的位移标识。

多个消费者可以订阅同一个队列,队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者处理,不是每个消费者都收到所有的消息并处理

RabbitMQ不支持队列层面的广播消费,如果需要只能进行二次开发。

(3)交换器、路由键、绑定、交换器类型

上面我们理解为生产者将消息直接投递到队列上,但其实并非如此。

交换器(Exchange),生产者会先将消息发送到交换器,交换器负责将消息路由到一个或多个队列中。若路由不到,可以返回给生产者或直接丢弃。

路由键(RoutingKey),生产者将消息发给交换器时,会指定一个路由键,用来指定消息的路由规则,可以决定消息流向哪里。路由键需要与交换器类型绑定键(BindingKey)联合使用。

绑定键(BindingKey),属于路由键的一种,通过绑定键将交换器和队列关联起来。某些情形下绑定键与路由键可以看作同一个东西。

可以这样区别路由键和绑定键:

  • 使用绑定时,需要的路由键是 BindingKey 。
    • 涉及的客户端方法:channel.exchangeBindchannel.queueBind
    • 对应AMQP命令:Exchange.BindQueue.Bind
  • 发送消息时,需要的路由键是 RoutingKey 。
    • 涉及的客户端方法:channel.basicPublish
    • 对应AMQP命令:Basic.Publish

交换器类型

  • fanout:会把发送到该交换器的消息路由到所有绑定的队列中。

  • direct:会把消息路由到BindingKey和RoutingKey完全匹配的队列中。

    如下案例:

    1
    2
    3
    	//路由键为warning,消息会路由到Queue1和Queue2
    channel.basicPublish(EXCHANGE_NAME, "warning", MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());

    若设置路由键为“info”或“debug”,消息只会路由到Queue2。

  • topic:在匹配规则上相比direct进行了扩展,也是把消息路由到BindingKey和RoutingKey匹配的队列中,但匹配规则包括:

    • RoutingKey和BindingKey为 . 分隔的字符串,如 com.rabbitmq.client
    • BindingKey中可以存着两个特殊字符串 *# ,用于模糊匹配,* 用于匹配一个单词,# 用于匹配多规格单词。

    如下案例:

    • 路由键 com.rabbitmq.client 的消息同时路由到Queue1和Queue2;
    • 路由键 com.hidden.client 的消息只路由到Queue2;
    • 路由键 com.hidden.demo 的消息只路由到Queue2;
    • 路由键 java.rabbitmq.demo 的消息只路由到Queue1;
    • 路由键 java.util.concurrent 的消息会被丢弃或返回给生产者;

  • headers:不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时会制定一组键值对,消息到交换器时也有键值对形式的headers,对比若完全匹配就路由到此队列。这一类型性能很差且不实用

(4)RabbitMQ运转流程

生产者发送消息(最初状态):

  1. 生产者连接到 RabbitMQ Broker 建立一个连接(Connection),开启一个信道(Channel) ;
  2. 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等;
  3. 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等;
  4. 生产者通过路由键将交换器和队列绑定起来;
  5. 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息;
  6. 相应的交换器根据接收到的路由键查找相匹配的队列;
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者。
  9. 关闭信道。
  10. 关闭连接。

消费者接收消息的过程:

  1. 消费者连接到 RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel);
  2. 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
  3. 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
  4. 消费者确认(ack)接收到的消息。
  5. RabbitMQ 从队列中删除相应己经被确认的消息。
  6. 关闭信道。
  7. 关闭连接。

(5)连接和信道

连接(Connection),即一条TCP连接。当TCP连接建立起来,客户端紧接着创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是虚拟链接,RabbitMQ处理每条AMQP指令都是通过信道完成

信道的作用?

一个应用程序会有很多线程需要从MQ中消费消息或生产消息,所以要建立很多TCP连接。对于操作系统而言,建立和销毁TCP连接都需要昂贵的开销,所以 RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择TCP连接复用,既减少性能开销也便于管理。

每个线程持有一个信道,信道复用了TCP连接,而 RabbitMQ 还可以保证每个线程的私密性和持有独立的连接一样。当单个信道流量很大时,多个信道复用一个连接就会产生性能瓶颈,此时可以开辟多个连接并进行均摊。

3.2 AMQP协议

AMQP协议的模型架构和 RabbitMQ的模型架构一样:生产者将消息发送给交换器,交换器与队列绑定,当生产者发送消息时所携带的 RoutingKey 与绑定时的 BindingKey 相匹配时,消息即被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。

AMQP协议包括三层:

  • Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如客户端可以使用 Queue.Declare 命令声明一个队列或者使用 Basic.Consume 订阅消费一个队列中的消息。
  • Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
  • Transport Layer :位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP说到底还是一个通信协议,都会涉及到报文交互,从 low level 举例来说,AMQP 本身是应用层的协议,其填充于 TCP 协议层的数据部分。而从 high-level 来说 AMQP 是通过协议命令进行交互的。AMQP 协议可以看作一系列结构化命令的集合,这里的命令代表一种操作,类似于 HTTP 中的方法(GET、POST、PUT、DELETE 等)。

(1)AMQP生产者流转过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Connection connection = factory.newConnection(); //创建连接
Channel channel = connection.createChannel(); //创建信道
// 创建一个 type direct 、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//创建一个持久化、非排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//发送一条持久化的消息 hello world !
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
//关闭资源
channel.close();
connection.close();

(2)AMQP消费者者流转过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//这里的连接方式与生产者的 demo 略有不同,注意辨别区别
Connection connection = factory.newConnection(addresses); //创建连接
final Channel channel = connection.createChannel(); //创建信道
channel.basicQos(64); //设置客户端最多接收未被 ack 的消息的个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("recv message : " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
//等待回调函数执行完毕之后 关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();

(3)AMQP命令概览

AMQP命令:

名称 是否包含内容体 对应客户端中的方法 简要描述
Connection.Start factory.newConnection 建立连接相关
Connection.Start-Ok 同上 同上
Connection.Tune 同上 同上
Connection.Tune-Ok 同上 同上
Connection.Open 同上 同上
Connection.Open-Ok 同上 同上
Connection.Close connection.close 关闭连接
Connection.Close-Ok 同上 同上
Channel.Open connection.openChannel 开启信道
Channel.Open-Ok 同上 同上
Channel.Close channel.close 关闭信道
Channel.Close-Ok 同上 同上
Exchange.Declare channel.exchangeDeclare 声明交换器
Exchange.Declare-Ok 同上 同上
Exchange.Delete channel.exchangeDelete 删除交换器
Exchange.Delete-Ok 向上 同上
Exchange.Bind channel.exchangeBind 交换器与交换器绑定
Exchange.Bind-Ok 同上 同上
Exchange.Unbind channel.exchangeUnbind 交换器与交换器解绑
Exchange.Unbind-Ok 同上 同上
Queue.Declare channel.queueDeclare 声明队列
Queue.Declare-Ok 同上 同上
Queue.Bind channel.queueBind 队列与交换器绑定
Queue.Bind-Ok 同上 同上
Queue.Purge channel.queuePurge 清除队列中的内容
Queue.Purge-Ok 同上 同上
Queue.Delete channel.queueDelete 删除队列
Queue.Delete-Ok 同上 向上
Queue.Unbind channel.queueUnbind 队列与交换器解绑
Queue.Unbind-Ok 同上 同上
Basic.Qos channel.basicQos 设置未被确认消费的个数
Basic.Qos-Ok 同上 同上
Basic.Consume channel.basicConsume 消费消息(推模式)
BasiιConsume-Ok 同上 同上
Basic.Cancel channel.basicCancel 取消
Basic.Cancel-Ok 同上 同上
Basic.Publish channel.basicPublish 发送消息
Basic.Return 未能成功路由的消息返回
Basic.Deliver Broker 推送消息
Basic.Get channel.basicGet 消费消息(拉模式〉
Basic.Get-Ok 同上 同上
Basic.Ack channel.basicAck 确认
Basic.Reject channel.basicReject 拒绝(单条拒绝)
Basic.Recover channel.basicRecover 请求 Broker 重新发送未被确认的消息
Basic.Recover-Ok 向上 同上
Basic.Nack channel.basicNack 拒绝(可批量拒绝〉
Tx.Select channel.txSelect 开启事务
Tx.Select-Ok 同上 同上
Tx.Commit channel.txCommit 事务提交
Tx.Commit-Ok 同上 同上
Tx.Rollback channel.txRollback 事务回滚
Tx.Rollback-Ok 同上 同上
Confirm Select channel.confinnSelect 开启发送端确认模式
Confirm.Select-Ok 同上 同上

参考:

🔗 《RabbitMQ实战指南》