RabbitMQ常见问题之高可用

RabbitMQ常见问题之高可用

一、集群分类

RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两
种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ3.8版本
以后,推出了新的功能——仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

二、普通集群搭建

1. 准备

建立如下文件夹结构

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
./cluster/
├── docker-compose.yml
├── mq1
│ ├── .erlang.cookie
│ └── conf
│ └── rabbitmq.conf
├── mq2
│ ├── .erlang.cookie
│ └── conf
│ └── rabbitmq.conf
└── mq3
├── .erlang.cookie
└── conf
└── rabbitmq.conf

2. 配置

rabbitmq.conf都写入以下内容

language-shell
1
2
3
4
5
6
7
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
vm_memory_high_watermark.absolute = 524288000

.erlang.cookie都写入以下内容

language-txt
1
SUGWXEQPRCPYJAVYPNZY

集群的所有节点的.erlang.cookie需要保持一致才能互相信任,具体内容并不固定,可以随便新建一个rabbitmq容器去查看其.erlang.cookie然后复制使用即可。

docker-compose.yml写入以下内容

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
version: "3.8"

networks:
rabbitmq-normal-cluster:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.30.3.0/24

services:
mq1:
container_name: mq1
hostname: mq1
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
volumes:
- ./mq1/conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie:ro
ports:
- "8071:5672"
- "8081:15672"
networks:
rabbitmq-normal-cluster:
ipv4_address: 172.30.3.11

mq2:
container_name: mq2
hostname: mq2
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
volumes:
- ./mq2/conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie:ro
ports:
- "8072:5672"
- "8082:15672"
networks:
rabbitmq-normal-cluster:
ipv4_address: 172.30.3.12

mq3:
container_name: mq3
hostname: mq3
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
volumes:
- ./mq3/conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie:ro
ports:
- "8073:5672"
- "8083:15672"
networks:
rabbitmq-normal-cluster:
ipv4_address: 172.30.3.13

3. 运行

language-bash
1
docker-compose -p rabbitmq-c up -d

三、镜像集群

1. 介绍

镜像模式的配置有3种模式:

ha-mode ha-params 效果
准确模式exactly 队列的副本量count 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
all (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。

2. 启用方式

三种模式启动方式分别如下,基于普通集群之上,命令均需要在单个容器内部执行。

language-bash
1
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
language-bash
1
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
language-bash
1
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

3. 测试

这里以exactly为例,在mq1中执行rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'后,所有前缀为twoqueue都会有1个主queue和1个副本。

language-txt
1
2
3
root@mq1:/# rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
Setting policy "ha-two" for pattern "^two\." to "{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}" with priority "0" for vhost "/" ...
root@mq1:/#

来到localhost:8081管理页,找到admin->policies可以看到策略生效。

来新建一个two.test.queue,可以看到这是一个拥有副本的queue

四、仲裁队列

1. 介绍

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致

上一章中想要一个镜像队列还要执行各种命令,遵循规定,现在不用了。

2. 创建

java使用目前只能基于@Bean创建

language-java
1
2
3
4
5
6
7
8
@Bean
public Queue quorumQueue(){

return QueueBuilder
.durable("quorum.queue2")
.quorum()
.build();
}

五、Java连接RabbitMQ集群方式

Java使用RabbitMQ集群application.yml中需要修改address

language-yml
1
2
3
4
spring:
rabbitmq:
host: localhost # rabbitMQ的ip地址
port: 5672 # 端口
language-yml
1
2
3
spring:
rabbitmq:
addresses: localhost:8071, localhost:8072, localhost:8073
Docker运行RabbitMQ并使用SpringAMQP操作

Docker运行RabbitMQ并使用SpringAMQP操作

一、RabbitMQ运行

拉取docker镜像

language-bash
1
docker pull rabbitmq:3-management

基础运行命令

language-bash
1
2
3
4
5
6
7
8
docker run \
-e RABBITMQ_DEFAULT_USER=rabbitmq \
-e RABBITMQ_DEFAULT_PASS=rabbitmq \
--name rabbitmq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

15672是网页后台管理系统,5672是给服务用的。

官方入门教程可以看这里RabbitMQ Tutorials — RabbitMQ

二、整合SpringAMQP

1. 引入依赖

language-xml
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
language-yml
1
2
3
4
5
6
spring:
rabbitmq:
host: localhost # rabbitMQ的ip地址
port: 5672 # rabbitMQ服务端口
username: rabbitmq
password: rabbitmq

三、测试

这边采用常用的消费者-生产者模型,使用默认的Direct类型exchange。不懂的可以先继续学习rabbitmq再来实践。

1. 消费者

在消费者服务随便新建一个listener

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@Component
public class SpringRabbitListener {


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "simple.queue"),
exchange = @Exchange(name = "simple.exchange"),
key = "simple"
))
public void listenSimpleQueue(String msg) {

log.info("消费者接收到simple.queue的消息:【" + msg + "】");
}
}

2. 生产者

在生产者服务的Test模块新建一个测试

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {

String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend("simple.exchange", "simple", message);
}
}

3. 运行

先启动消费者。
登录http://localhost:15672/,可以看到simple.exchangesimple.queue已被创建。

然后启动测试testSendMessage2SimpleQueue,出现类似以下日志,消息发送成功。

language-txt
1
2
12:17:43:771  INFO 21064 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
12:17:43:808 INFO 21064 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#50f40653:0/SimpleConnection@536d97f8 [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 59641]

消费者出现类似以下日志,收到消息。

language-txt
1
2
3
12:17:31:074  INFO 8924 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a27cb34:0/SimpleConnection@671facee [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 59634]
12:17:31:141 INFO 8924 --- [ main] cn.itcast.mq.ConsumerApplication : Started ConsumerApplication in 1.011 seconds (JVM running for 1.462)
12:17:43:848 INFO 8924 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到simple.queue的消息:【hello, spring amqp!】