从RSA角度出发解析JWT原理

从RSA角度出发解析JWT原理

在今天的数字化世界中,安全地传递信息变得越来越重要。JSON Web TokenJWT)作为一种流行的开放标准,为简化服务器与客户端之间的安全交流提供了一种高效的方式。本文旨在深入解析JWT的工作原理,并通过示例演示如何使用JWT进行安全通信。我们将从JWT的基本组成部分讲起,探讨公密钥加密的原理,解释为什么JWT是安全的,以及如何验证JWT的有效性。

一、JWT介绍

1. JWT组成部分

JWT由三个部分组成,它们分别是头部(Header)、载荷(Payload)、和签名(Signature)。通过将这三部分用点(.)连接起来,形成了一个完整的JWT字符串。例如:xxxxx.yyyyy.zzzzz

2. 头部(Header)

头部通常由两部分组成:令牌类型(typ)和签名算法(alg)。例如:

language-json
1
2
3
4
5
{

"kid": "33bd1cad-62a6-4415-89a6-c2c816f3d3b1",
"alg": "RS256"
}
  • kid (Key ID):密钥标识符,用于指明验证JWT签名时使用的密钥。在含有多个密钥的系统中,这可以帮助接收者选择正确的密钥进行验证。这里的值33bd1cad-62a6-4415-89a6-c2c816f3d3b1是一个UUID,唯一标识了用于签名的密钥。
  • alg (Algorithm):指明用于签名的算法,这里是RS256,表示使用RSA签名算法和SHA-256散列算法。这种算法属于公钥/私钥算法,意味着使用私钥进行签名,而用公钥进行验证。

3. 载荷(Payload)

载荷包含了所要传递的信息,这些信息以声明(claims)的形式存在。声明可以是用户的身份标识,也可以是其他任何必要的信息。载荷示例:

language-json
1
2
3
4
5
6
7
8
9
10
{

"sub": "XcWebApp",
"aud": "XcWebApp",
"nbf": 1707373072,
"iss": "http://localhost:63070/auth",
"exp": 1707380272,
"iat": 1707373072,
"jti": "62e885c5-6b3f-49a2-aa10-b2e872a52b33"
}
  • sub (Subject):主题,标识了这个JWT的主体,通常是指用户的唯一标识。这里XcWebApp可能是一个应用或用户标识。
  • aud (Audience):受众,标识了这个JWT的预期接收者。这里同样是XcWebApp,意味着这个JWT是为XcWebApp这个应用或服务生成的。
  • nbf (Not Before):生效时间,这个时间之前,JWT不应被接受处理。这里的时间是Unix时间戳格式,表示JWT生效的具体时间。
  • iss (Issuer):发行者,标识了这个JWT的发行方。这里是http://localhost:63070/auth,表明JWT由本地的某个认证服务器发行。
  • exp (Expiration Time):过期时间,这个时间之后,JWT不再有效。同样是Unix时间戳格式,表示JWT过期的具体时间。
  • iat (Issued At):发行时间,JWT创建的时间。这提供了JWT的时间信息,也是Unix时间戳格式。
  • jti (JWT ID):JWT的唯一标识符,用于防止JWT被重放(即两次使用同一个JWT)。这里的值是一个UUID,确保了JWT的唯一性。

4. 签名(Signature)

签名是对头部和载荷的加密保护,确保它们在传输过程中未被篡改。根据头部中指定的算法(例如HS256),使用私钥对头部和载荷进行签名。

二、深入理解JWT签名验证

1. 签名生成

  1. 哈希处理 :首先对JWT的头部和载荷部分(经过Base64编码并用.连接的字符串)进行哈希处理,生成一个哈希值。这个步骤是为了确保数据的完整性,即使是微小的改动也会导致哈希值有很大的不同。
  2. 私钥加密哈希值 :然后使用发行者的私钥对这个哈希值进行加密,生成的结果就是JWT的签名部分。这个加密的哈希值(签名)附加在JWT的后面。

2. 签名验证

  1. 哈希处理 :接收方收到JWT后,会独立地对其头部和载荷部分进行同样的哈希处理,生成一个哈希值。
  2. 公钥解密签名 :接收方使用发行者的公钥对签名(加密的哈希值)进行解密,得到另一个哈希值。
  3. 哈希值比较比较这两个哈希值。如果它们相同,就证明了JWT在传输过程中未被篡改,因为只有相应的私钥能够生成一个对应的、能够通过公钥解密并得到原始哈希值的签名。

3. 为什么JWT是安全的

由于破解RSA算法的难度极高,没有私钥就无法生成有效的签名,因此无法伪造签名。这就是为什么使用公钥进行签名验证是安全的原因。

三、如何验证JWT是否有效

对于一个JWT,我们可以使用Base64解码,获取前两部分信息,可以进行token是否过期等验证,具体取决于前两部分具体内容,这是可以人为设置的。
对于签名部分,可以用公钥去验证其有效性(即是否被纂改)。

四、 Why JWT?

为什么是JWT?
首先是不可伪造的安全性,其次,因为你会发现,只要有公钥就可以验证JWT这种token,这也就意味着对于微服务来说,任意微服务都有能力自行验证JWT,而不需要额外的验证模块。这种自校验没有用到网络通信,性能十分好。同时,JWT有时间限制,一定程度上也提高了最坏情况的安全性。

SpringCloud + Nacos环境下抽取Feign独立模块并支持MultipartFile

SpringCloud + Nacos环境下抽取Feign独立模块并支持MultipartFile

一、前提条件和背景

1. 前提

已经部署好Nacos,本文以192.168.101.65:8848为例。

2. 背景

有两个微服务mediacontent,都已经注册到Nacos
后者通过引用Feign实现远程调用前者。
两个微服务都被分为3个子模块:api、service、model,对应三层架构。

请根据自身情况出发阅读本文。

二、Feign模块

1. 依赖引入

首先需要Feign依赖和扩展。

language-xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!--   openfeign     -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>
<!--feign支持Multipart格式传参-->
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>io.github.openfeign.form</groupId>
<artifactId>feign-form-spring</artifactId>
<version>3.8.0</version>
</dependency>

需要测试依赖(可选),为了MockMultipartFile类才引入的,非必需功能。

language-xml
1
2
3
4
5
6
7
<!--    测试    -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>6.1.2</version>
<scope>compile</scope>
</dependency>

其次需要涉及到的微服务的数据模型,根据个人情况而定。

如果只想要它们的数据模型,而不引入不必要的依赖,可以使用通配符*全部过滤掉。

language-xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!--   数据模型pojo     -->
<dependency>
<groupId>com.xuecheng</groupId>
<artifactId>xuecheng-plus-media-model</artifactId>
<version>0.0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.xuecheng</groupId>
<artifactId>xuecheng-plus-content-model</artifactId>
<version>0.0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

2. application.yaml配置

填入以下内容,大抵为超时熔断处理。(可选),甚至可以留空。

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
feign:
hystrix:
enabled: true
circuitbreaker:
enabled: true
hystrix:
command:
default:
execution:
isolation:
thread:
timeoutInMilliseconds: 30000
ribbon:
ConnectTimeout: 60000
ReadTimeout: 60000
MaxAutoRetries: 0
MaxAutoRetriesNextServer: 1

3. 扩展支持MultipartFile

新建一个配置类,如下,
主要是Encoder feignEncoder()使得Feign支持MultipartFile类型传输。
MultipartFile getMultipartFile(File file)是一个工具方法,和配置无关。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class MultipartSupportConfig {


@Autowired
private ObjectFactory<HttpMessageConverters> messageConverters;

@Bean
@Primary//注入相同类型的bean时优先使用
@Scope("prototype")
public Encoder feignEncoder() {

return new SpringFormEncoder(new SpringEncoder(messageConverters));
}

//将file转为Multipart
public static MultipartFile getMultipartFile(File file) {

try {

byte[] content = Files.readAllBytes(file.toPath());
MultipartFile multipartFile = new MockMultipartFile(file.getName(),
file.getName(), Files.probeContentType(file.toPath()), content);
return multipartFile;
} catch (IOException e) {

e.printStackTrace();
XueChengPlusException.cast("File->MultipartFile转化失败");
return null;
}
}
}

4. 将media-api注册到feign

新建一个类,如下。
@FeignClient(value)要和服务名称对上,即media模块spring.application.name=media-api
@FeignClient(path)要和服务前缀路径对上,即media模块server.servlet.context-path=/media
然后MediaClient中的方法定义尽量和media模块对应的controller函数保持一致。

language-java
1
2
3
4
5
6
7
8
9
10
11
@FeignClient(value = "media-api", path = "/media")
public interface MediaClient {


@RequestMapping(value = "/upload/coursefile", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public UploadFileResultDto upload(
@RequestPart("filedata") MultipartFile file,
@RequestParam(value = "objectName", required = false) String objectName
);
}

三、Media模块

被调用方media模块无需做什么修改。

四、Content模块

测试在content-api上操作。

1. 引入依赖

content模块需要引入刚才feign模块的依赖。

language-xml
1
2
3
<dependency>
<!-- 根据自身情况引入 -->
</dependency>

2. 启用FeignClient

在启动类上加上@EnableFeignClients注解。

3. 测试

新建测试类,如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.xuecheng.content.service.jobhandler;

import com.xuecheng.feign.client.MediaClient;
import com.xuecheng.media.model.dto.UploadFileResultDto;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.web.multipart.MultipartFile;

@SpringBootTest
class CoursePublishTaskTest {

@Autowired
MediaClient mediaClient;

@Test
void generateCourseHtml() {

MultipartFile file = new MockMultipartFile(
"filedata",
"filename.txt",
"text/plain",
"Some dataset...".getBytes()
);
UploadFileResultDto upload = mediaClient.upload(file, "/static-test/t-1");
System.out.println(upload);
}
}

启动Media模块,启动测试方法,
具体的Debug和检验,可以通过Media模块对应的controller函数打印日志,检查是否通过MediaClient 被触发。

五、需要澄清的几点

  1. Feign模块不需要注册到Nacos且不需要服务发现
    正确。feign-client模块只是一个包含Feign客户端接口的库,它自身并不是一个独立的微服务。因此,它不需要注册到Nacos,也不需要服务发现功能。这个模块只是被其他微服务模块(如content模块)作为依赖引入。这样做的主要目的是为了代码的重用和解耦,允许任何微服务通过引入这个依赖来调用其他服务。

  2. 只有调用者(如content模块)需要使用@EnableFeignClients注解,被调用者(如media模块)不需要
    正确。@EnableFeignClients注解是用来启用Feign客户端的,它告诉Spring Cloud这个服务将会使用Feign来进行远程服务调用。因此,只有需要使用Feign客户端的服务(在这个例子中是content模块)需要添加这个注解。而被调用的服务(如media模块),只需作为普通的Spring Boot应用运行,提供REST API即可,无需使用@EnableFeignClients

  3. 如何在服务间共享数据模型(如DTOs)而不引入不必要的依赖。
    解决这个问题的一种方法是创建一个共享的库或模块,这个库包含所有服务共享的数据模型。另一种使用依赖剥离,使用通配符(*)可以排除pom.xml中特定依赖的所有传递性依赖。

Docker部署xxl-job调度器并结合SpringBoot测试

Docker部署xxl-job调度器并结合SpringBoot测试

一、Docker部署

1. 创建数据库

去Github下载最新发布的源码,https://github.com/xuxueli/xxl-job/releases,找到/xxl-job/doc/db/tables_xxl_job.sql文件,对数据库进行执行即可,脚本里面包含数据库的创建。

2. 启动容器

参考官方中文文档,写出如下docker-compose示例。使用-e PARAMS: ""来指定一些变量,包括数据库信息,一般需要根据自身情况修改。

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
version: "3.8"
networks:
docker_xuecheng:
ipam:
config:
- subnet: 172.20.0.0/16

services:
xxl-job:
container_name: xxl-job
image: xuxueli/xxl-job-admin:2.4.0
volumes:
- ./xxl_job/logs:/data/applogs
ports:
- "8088:8080"
environment:
PARAMS: '
--spring.datasource.url=jdbc:mysql://172.20.0.2:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
--spring.datasource.username=root
--spring.datasource.password=1009'
networks:
docker_xuecheng:
ipv4_address: 172.20.3.1

3. 访问

访问http://192.168.101.65:8088/xxl-job-admin/即可。

4. 新建执行器

新增一个简单的testHandler执行器。

二、SpringBoot整合

1. 模块注册到执行器

在对应模块引入依赖

language-xml
1
2
3
4
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>

并指定执行器的appname

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
xxl:
job:
admin:
addresses: http://192.168.101.65:8088/xxl-job-admin
executor:
appname: testHandler
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token

2. 创建配置类

在源码中找到src/main/java/com/xxl/job/executor/core/config/XxlJobConfig.java,复制到模块代码中。如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.xuecheng.media.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {

private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {

logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}

/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/


}

3. 启动测试

重启模块,访问XXL-JOB网页端,查看情况。如果执行器的OnLine 机器地址有一个信息,表示模块绑定成功。

三、任务发布-普通任务

1. 编写任务代码

源代码中有任务代码示例,路径为src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java,仿照写一个简单的任务,如下。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class SampleXxlJob {

/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {

System.out.println("处理视频");
}
}

2. 创建任务

选择执行器,并指定JobHandler

3. 启动任务

启动刚才创建的任务

对应模块的日志可以看到每10秒打印一次输出。

XXL-JOB网页管理也可以看到相关任务执行记录。

四、任务发布-分片任务

1. 编写任务代码

language-java
1
2
3
4
5
6
7
8
9
10
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

System.out.println("分片参数:当前分片序号 = " + shardIndex + ", 总分片数 = " + shardTotal);

}

2. 启动多个实例

添加虚拟机参数-Dserver.port=63051 -Dxxl.job.executor.port=9998,前者区分程序端口,后者区分执行器端口。

3. 创建任务

创建任务之前,检查一下两个模块是否注册到指定执行器。

随后创建任务,指定执行器JobHandler,同时路由策略选择分片广播

4. 启动任务

启动任务后,观察两个模块的日志。

同时任务记录也在XXL-JOB管理网页中可以查询到。

五、动态扩容

当运行分片任务时,又添加一个新的模块示例,此时分片任务会自动扩容再分配。如图,我们再复制一个运行配置。

然后将其运行,等待一会,执行器可以看到有3个绑定的机器。

新增的运行实例日志如下,

同时,先前两个运行实例的日志发送了变化,如下

参考资料

Docker多节点部署Minio分布式文件系统并测试

Docker多节点部署Minio分布式文件系统并测试

一、前提准备

准备如下文件夹和文件

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
./
├── docker-compose-minio.yml
├── .env
├── env
│ ├── minio.env
├── minio
│ ├── minio1
│ │ ├── data1
│ │ └── data2
│ ├── minio2
│ │ ├── data1
│ │ └── data2
│ ├── minio3
│ │ ├── data1
│ │ └── data2
│ └── minio4
│ ├── data1
│ └── data2

二、文件配置

1. .env

language-env
1
MINIO_VERSION=RELEASE.2024-01-29T03-56-32Z

2. env/minio.env

language-env
1
2
MINIO_ROOT_USER=minio
MINIO_ROOT_PASSWORD=minio123

3. docker-compose-minio.yml

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
version: "3.8"
networks:
docker_xuecheng:
ipam:
config:
- subnet: 172.20.0.0/16

services:
minio1:
container_name: minio1
image: minio/minio:${
MINIO_VERSION}
volumes:
- ./minio/minio1/data1:/data1
- ./minio/minio1/data2:/data2
ports:
- "9001:9000"
- "9011:9001"
env_file:
- ./env/minio.env
command: server --address ":9000" --console-address ":9001" http://172.20.2.{
1...4}/data{
1...2}
networks:
docker_xuecheng:
ipv4_address: 172.20.2.1

minio2:
container_name: minio2
image: minio/minio:${
MINIO_VERSION}
volumes:
- ./minio/minio2/data1:/data1
- ./minio/minio2/data2:/data2
ports:
- "9002:9000"
- "9012:9001"
env_file:
- ./env/minio.env
command: server --address ":9000" --console-address ":9001" http://172.20.2.{
1...4}/data{
1...2}
networks:
docker_xuecheng:
ipv4_address: 172.20.2.2

minio3:
container_name: minio3
image: minio/minio:${
MINIO_VERSION}
volumes:
- ./minio/minio3/data1:/data1
- ./minio/minio3/data2:/data2
ports:
- "9003:9000"
- "9013:9001"
env_file:
- ./env/minio.env
command: server --address ":9000" --console-address ":9001" http://172.20.2.{
1...4}/data{
1...2}
networks:
docker_xuecheng:
ipv4_address: 172.20.2.3

minio4:
container_name: minio4
image: minio/minio:${
MINIO_VERSION}
volumes:
- ./minio/minio4/data1:/data1
- ./minio/minio4/data2:/data2
ports:
- "9004:9000"
- "9014:9001"
env_file:
- ./env/minio.env
command: server --address ":9000" --console-address ":9001" http://172.20.2.{
1...4}/data{
1...2}
networks:
docker_xuecheng:
ipv4_address: 172.20.2.4

三、测试

访问宿主机ip:9011,输入账号密码。

language-txt
1
2
MINIO_ROOT_USER=minio
MINIO_ROOT_PASSWORD=minio123

点到Monitoring -> Metrics

四、Java测试

1. 引入依赖

language-xml
1
2
3
4
5
6
7
8
9
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

2. 增删改

在这之前先去网页端,创建一个Bucket

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.xuecheng.media;

import io.minio.*;
import io.minio.errors.*;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.junit.jupiter.api.Test;

import java.io.*;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

public class MinioTest {

private MinioClient minioClient = MinioClient.builder()
.endpoint("http://192.168.101.65:9001") //改成你的宿主机ip
.credentials("minio", "minio123")
.build();

@Test
public void testCreate() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {

ObjectWriteResponse file = minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket("test")
.filename("C:\\Users\\mumu\\Desktop\\1C6091EF9671978A9F1B6C6F8A3666FD.png")
.object("1.png")
.build()
);
}

@Test
public void testDelete() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {

minioClient.removeObject(
RemoveObjectArgs.builder()
.bucket("test")
.object("12.msi")
.build()
);
}

@Test
public void testGet() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {

InputStream inputStream = minioClient.getObject(
GetObjectArgs.builder()
.bucket("test")
.object("1.png")
.build()
);
FileOutputStream outputStream = new FileOutputStream(new File("C:\\Users\\mumu\\Desktop\\2.png"));
IOUtils.copy(inputStream, outputStream);
}

}

SpringCloud Gateway(4.1.0) 返回503:原因分析与解决方案

SpringCloud Gateway(4.1.0) 返回503:原因分析与解决方案

一、环境版本

Version
spring-cloud-dependencies 2023.0.0
spring-cloud-starter-gateway 4.1.0
Nacos v2.3.0

二、原因分析

Spring Cloud Gateway 的早期版本中,Ribbon 被用作默认的负载均衡器。随着Spring Cloud的发展,RibbonSpring Cloud LoadBalancer 替代。在过渡期间,为了兼容,Spring Cloud 同时支持了 RibbonSpring Cloud LoadBalancer。然而,从 Spring Cloud 2020.0.0 版本开始,Ribbon 被废弃,Spring Cloud LoadBalancer 成为了推荐的负载均衡方案。

在这个版本变动中,为了提供更大的灵活性,spring-cloud-starter-loadbalancer 被标记为了可选依赖,不再默认包含在 Spring Cloud Gateway 中。因此,在使用 4.1.0 版本的 Spring Cloud Gateway 并需要服务发现和负载均衡功能时,如果没有显式包含这个依赖,就会导致无法处理 lb://URI,从而返回503错误。

三、解决方案

要解决这个问题,您需要在您的项目的 POM 文件中显式添加 spring-cloud-starter-loadbalancer 依赖:

language-xml
1
2
3
4
5
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>4.1.0</version>
</dependency>

添加后,确保重启应用程序以使配置生效。这样,Spring Cloud Gateway 就能够正确处理基于服务发现的负载均衡,从而避免503错误。

通过理解 Spring Cloud 的历史演变和适应其依赖管理的变化,我们可以更好地维护和优化我们的微服务架构。

SpringCloud + Nacos配置文件加载顺序和优先级详解

SpringCloud + Nacos配置文件加载顺序和优先级详解

在微服务架构中,合理地管理和理解配置文件的加载顺序与优先级对于确保应用的稳定性和灵活性至关重要。特别是在使用 Spring Cloud Alibaba Nacos 作为配置中心的场景下,这一点显得尤为重要。本文将基于一个具体的 bootstrap.yml 配置示例,深入探讨这些概念,并介绍如何通过 Nacos 配置实现本地配置的优先级设置。

一、加载顺序与优先级

1. 示例配置

首先,我们看一下示例的 bootstrap.yml 配置:

language-yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
spring:
application:
name: content-api
cloud:
nacos:
server-addr: 192.168.101.65:8848
discovery:
namespace: dev
group: xuecheng-plus-project
config:
namespace: dev
group: xuecheng-plus-project
file-extension: yaml
refresh-enabled: true
extension-configs:
- data-id: content-service-${
spring.profiles.active}.yaml
group: xuecheng-plus-project
refresh: true
shared-configs:
- data-id: swagger-${
spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
- data-id: logging-${
spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true
profiles:
active: dev

2. 配置文件分类

Spring Cloud Alibaba Nacos 环境中,我们主要遇到以下类型的配置文件:

  1. 本地配置文件

    • bootstrap.yml / bootstrap.yaml
    • application.yml / application.yaml
  2. Nacos 配置中心的配置文件

    • 共享配置文件 (shared-configs)
    • 扩展配置文件 (extension-configs)
    • 项目应用名配置文件 (${spring.application.name}.yaml / .properties)

3. 加载顺序

  1. **bootstrap.yml / bootstrap.yaml**:首先加载,用于配置应用的启动环境。
  2. Nacos 配置中心的配置文件
    • 先加载 共享配置文件 (shared-configs)
    • 然后是 扩展配置文件 (extension-configs)
    • 最后是 项目应用名配置文件 (${spring.application.name}.yaml / .properties)
  3. **application.yml / application.yaml**:在 Nacos 配置加载之后。

4. 优先级

  1. 项目应用名配置文件:具有最高优先级。
  2. 扩展配置文件:次之,覆盖共享配置。
  3. 共享配置文件:优先级低于扩展配置。
  4. **本地 application.yml / application.yaml**:优先级低于所有从 Nacos 加载的配置。
  5. **本地 bootstrap.yml / bootstrap.yaml**:优先级最低。

二、本地配置优先的设置

Nacos 中,可以通过特定的配置来设置本地配置优先。这可以在 bootstrap.ymlapplication.yml 文件中设置:

language-yaml
1
2
3
4
spring:
cloud:
config:
override-none: true

override-none 设置为 true 时,本地配置文件 (application.yml / application.yaml) 将具有最高的优先级,即使这些配置在 Nacos 中也有定义。这种设置适用于需要在不同环境中覆盖远程配置中心配置的场景。

结论

了解和正确应用 Spring Cloud Alibaba Nacos 中配置文件的加载顺序和优先级,对于确保微服务的正确运行至关重要。此外,通过配置 override-nonetrue,可以灵活地实现本地配置优先的需求,进一步增强了配置管理的灵活性。这些特性使得 Spring Cloud Alibaba Nacos 成为管理微服务配置的强大工具。

RabbitMQ常见问题之高可用

RabbitMQ常见问题之高可用

一、集群分类

RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两
种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ3.8版本
以后,推出了新的功能——仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

二、普通集群搭建

1. 准备

建立如下文件夹结构

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
./cluster/
├── docker-compose.yml
├── mq1
│ ├── .erlang.cookie
│ └── conf
│ └── rabbitmq.conf
├── mq2
│ ├── .erlang.cookie
│ └── conf
│ └── rabbitmq.conf
└── mq3
├── .erlang.cookie
└── conf
└── rabbitmq.conf

2. 配置

rabbitmq.conf都写入以下内容

language-shell
1
2
3
4
5
6
7
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
vm_memory_high_watermark.absolute = 524288000

.erlang.cookie都写入以下内容

language-txt
1
SUGWXEQPRCPYJAVYPNZY

集群的所有节点的.erlang.cookie需要保持一致才能互相信任,具体内容并不固定,可以随便新建一个rabbitmq容器去查看其.erlang.cookie然后复制使用即可。

docker-compose.yml写入以下内容

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
version: "3.8"

networks:
rabbitmq-normal-cluster:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.30.3.0/24

services:
mq1:
container_name: mq1
hostname: mq1
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
volumes:
- ./mq1/conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie:ro
ports:
- "8071:5672"
- "8081:15672"
networks:
rabbitmq-normal-cluster:
ipv4_address: 172.30.3.11

mq2:
container_name: mq2
hostname: mq2
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
volumes:
- ./mq2/conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie:ro
ports:
- "8072:5672"
- "8082:15672"
networks:
rabbitmq-normal-cluster:
ipv4_address: 172.30.3.12

mq3:
container_name: mq3
hostname: mq3
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
volumes:
- ./mq3/conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie:ro
ports:
- "8073:5672"
- "8083:15672"
networks:
rabbitmq-normal-cluster:
ipv4_address: 172.30.3.13

3. 运行

language-bash
1
docker-compose -p rabbitmq-c up -d

三、镜像集群

1. 介绍

镜像模式的配置有3种模式:

ha-mode ha-params 效果
准确模式exactly 队列的副本量count 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
all (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。

2. 启用方式

三种模式启动方式分别如下,基于普通集群之上,命令均需要在单个容器内部执行。

language-bash
1
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
language-bash
1
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
language-bash
1
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

3. 测试

这里以exactly为例,在mq1中执行rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'后,所有前缀为twoqueue都会有1个主queue和1个副本。

language-txt
1
2
3
root@mq1:/# rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
Setting policy "ha-two" for pattern "^two\." to "{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}" with priority "0" for vhost "/" ...
root@mq1:/#

来到localhost:8081管理页,找到admin->policies可以看到策略生效。

来新建一个two.test.queue,可以看到这是一个拥有副本的queue

四、仲裁队列

1. 介绍

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致

上一章中想要一个镜像队列还要执行各种命令,遵循规定,现在不用了。

2. 创建

java使用目前只能基于@Bean创建

language-java
1
2
3
4
5
6
7
8
@Bean
public Queue quorumQueue(){

return QueueBuilder
.durable("quorum.queue2")
.quorum()
.build();
}

五、Java连接RabbitMQ集群方式

Java使用RabbitMQ集群application.yml中需要修改address

language-yml
1
2
3
4
spring:
rabbitmq:
host: localhost # rabbitMQ的ip地址
port: 5672 # 端口
language-yml
1
2
3
spring:
rabbitmq:
addresses: localhost:8071, localhost:8072, localhost:8073
RabbitMQ常见问题之消息堆积

RabbitMQ常见问题之消息堆积

一、介绍

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最
早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限

RabbitMQ3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非內存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

二、使用惰性队列

1. 基于@Bean

language-java
1
2
3
4
5
6
7
8
@Bean
public Queue lazyQueue(){

return QueueBuilder
.durable("lazy.queue")
.lazy()
.build();
}

2. 基于@RabbitListener

language-java
1
2
3
4
5
6
7
8
9
    @RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "lazy.queue", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
exchange = @Exchange(name = "simple.exchange"),
key = "lazy"
))
public void listenLazyExchange(String msg){

// log.info("消费者接收到lazy.queue的消息:【" + msg + "】");
}
RabbitMQ常见问题之延迟消息

RabbitMQ常见问题之延迟消息

一、死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而
这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

二、TTL

如果messagequeue都有ttl,采用更小的一方。

1. Queue指定死信交换机并设置TTL

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
public class CommonConfig {

@Bean
public DirectExchange ttlExchange(){

return new DirectExchange("ttl.direct");
}

@Bean
public Queue ttlQueue(){

return QueueBuilder
.durable("ttl.queue")
.ttl(10000)
.deadLetterExchange("dl.direct")
.deadLetterRoutingKey("dl")
.build();
}

@Bean
public Binding ttlBinding(){

return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}

2. 消息设置TTL

language-java
1
2
3
4
5
6
7
8
9
10
@Test
public void testTTLMessage(){

Message message = MessageBuilder.withBody("hello ttl".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000")
.build();
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
log.info("ttl消息已发送");
}

借助TTL机制可以用死信交换机模拟延迟队列,但是设计上比较牵强,性能不好。

三、延迟队列

这是官方提供的一些额外插件
https://www.rabbitmq.com/community-plugins.html

下载其中的DelayExchange插件,把.ez文件挂载到RabbitMQ容器的/plugins目录下,然后进入容器,执行

language-bash
1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
root@7c4ba266e5bc:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@7c4ba266e5bc:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@7c4ba266e5bc...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange

started 1 plugins.

1. SpringAMQP创建延迟队列

基于@RabbitListener或者基于@Bean都可以。

language-java
1
2
3
4
5
6
7
8
9
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg){

log.info("消费者接收到delay.queue的延迟消息:【" + msg + "】");
}

2. 设置消息延迟

这个插件只能在消息上设置延迟时间,没有队列设置延迟时间的概念,不过都是一样的。
message要在Header上添加一个x-delay

language-java
1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testDelayMessage(){

Message message = MessageBuilder.withBody("hello delay".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 5000)
.build();
// confirm callback
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("发送消息成功");
}

3. 测试

直接运行测试,可能会报错,因为rabbitmq意识到消息到了exchange却没有立即到queue,被认为错误,回调returnback,所以我们在ReturnCallBack中绕过这个限制。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {


@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{

//check if is delay message
if (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {

return;
}
log.error("消息发送到queue失败,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",
replyCode, replyText, exchange, routingKey, message.toString());
});
}
}

运行Test测试,可以看到Test方面,消息发送的时间为21:09:13

language-txt
1
2
21:09:13:516  INFO 25468 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2063c53e:0/SimpleConnection@6415f61e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 62470]
21:09:13:557 INFO 25468 --- [ main] cn.itcast.mq.spring.SpringAmqpTest : 发送消息成功

listener方面消息消费的时间为21:09:18,刚好5s。

language-txt
1
2
21:08:31:952  INFO 19532 --- [           main] cn.itcast.mq.ConsumerApplication         : Started ConsumerApplication in 1.735 seconds (JVM running for 2.357)
21:09:18:583 INFO 19532 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到delay.queue的延迟消息:【hello delay】
RabbitMQ常见问题之消息可靠性

RabbitMQ常见问题之消息可靠性

一、介绍

MQ的消息可靠性,将从以下四个方面展开并实践:

  1. 生产者消息确认
  2. 消息持久化
  3. 消费者消息确认
  4. 消费失败重试机制

二、生产者消息确认

对于publisher,如果message到达exchange与否,rabbitmq提供publiser-comfirm机制,如果message达到exchange但是是否到达queuerabbitmq提供publisher-return机制。这两种机制在代码中都可以通过配置来自定义实现。

以下操作都在publisher服务方完成。

1. 引入依赖

language-xml
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
language-yaml
1
2
3
4
5
6
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true

配置说明:
publish-confirm-type:开启publisher-confirm,这里支持两种类型:

  • simple:同步等待confirm结果,直到超时
  • correlated:异步回调,定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback

publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback; false,则直接丢弃消息

2. 配置ReturnCallBack

每个RabbitTemplate只能配置一个ReturnCallBack,所以直接给IoC里面的RabbitTemplate配上,所有人都统一用。
新建配置类,实现ApplicationContextAware 接口,在接口中setReturnCallback

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {


@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{

//check if is delay message
if (message.getMessageProperties().getReceivedDelay() != null && message.getMessageProperties().getReceivedDelay() > 0) {

return;
}
log.error("消息发送到queue失败,replyCode={}, reason={}, exchange={}, routeKey={}, message={}",
replyCode, replyText, exchange, routingKey, message.toString());
});
}
}

3. 配置ConfirmCallBack

ConfirmCallBack在message发送时配置,每个message都可以有自己的ConfirmCallBack。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {

String message = "hello, spring amqp!";
// confirm callback
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(
result -> {

if (result.isAck()){

log.debug("消息到exchange成功, id={}", correlationData.getId());
}else {

log.error("消息到exchange失败, id={}", correlationData.getId());
}
},
throwable -> {

log.error("消息发送失败", throwable);
}
);

rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
}

4. 测试

将消息发送到一个不存在的exchange,模拟消息达到exchange失败,触发ConfirmCallBack,日志如下。

language-txt
1
2
18:22:03:913 ERROR 23232 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aamq.topic' in vhost '/', class-id=60, method-id=40)
18:22:03:915 ERROR 23232 --- [nectionFactory1] cn.itcast.mq.spring.SpringAmqpTest : 消息到exchange失败, id=0c0910a3-7937-43ea-9606-e5bbcdda0b5c

将消息发送到一个存在的exchange,但routekey异常,模拟消息到达exchange但没有到达queue,触发ConfirmCallBackReturnCallBack,日志如下。

language-txt
1
2
3
18:27:22:757  INFO 20184 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#7428de63:0/SimpleConnection@6d60899e [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 53662]
18:27:22:797 DEBUG 20184 --- [ 127.0.0.1:5672] cn.itcast.mq.spring.SpringAmqpTest : 消息到exchange成功, id=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11
18:27:22:796 ERROR 20184 --- [nectionFactory1] cn.itcast.mq.config.CommonConfig : 消息发送到queue失败,replyCode=312, reason=NO_ROUTE, exchange=amq.topic, routeKey=simplee.test, message=(Body:'hello, spring amqp!' MessageProperties [headers={spring_returned_message_correlation=5fbdaaa1-5f20-4683-bdfa-bd71cd6afd11}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

三、消息持久化

新版本的SpringAMQP默认开启持久化。RabbitMQ本身并不默认开启持久化。

队列持久化,通过QueueBuilder构建持久化队列,比如

language-java
1
2
3
4
5
6
7
@Bean
public Queue simpleQueue(){

return QueueBuilder
.durable("simple.queue")
.build();
}

消息持久化,在发送时可以设置,比如

language-java
1
2
3
4
5
6
7
8
@Test
public void testDurableMessage(){

Message message = MessageBuilder.withBody("hello springcloud".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend("simple.queue", message);
}

四、消费者消息确认

消费者消息确认是指,consumer收到消息后会给rabbitmq发送回执来确认消息接收状况。

SpringAMQP允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack, MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
language-yaml
1
2
3
4
5
6
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto # manual auto none

但是auto有个很大的缺陷,因为rabbitmq会自动不断给有问题的listen反复投递消息,导致不断报错,所以建议使用下一章的操作。

五、消费失败重试机制

当消费者出现异常后,消息会不断requeue (重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue
,无限循环,导致mq的消息处理飙升,带来不必要的压力。

我们可以利用Springretry机制,在消费者出现异常时利用本地重试,而不是无限制的requeuemq队列。

1. 引入依赖

language-yml
1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 #初识的失败等待时长为1秒
multiplier: 2 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2. 配置重试次数耗尽策略

我们采用RepublishMessageRecoverer
定义用于接收失败消息的exchangequeue以及它们之间的bindings

然后定义MessageRecoverer,比如

language-java
1
2
3
4
5
6
7
8
9
@Component
public class ErrorMessageConfig {

@Bean
public MessageRecoverer republishMessageRecover(RabbitTemplate rabbitTemplate){

return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
}

3. 测试

定义处理异常消息的exchangequeue,比如

language-java
1
2
3
4
5
6
7
8
9
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "error.queue"),
exchange = @Exchange(name = "error.exchange"),
key = "error"
))
public void listenErrorQueue(String msg){

log.info("消费者接收到error.queue的消息:【" + msg + "】");
}

定义如下一个listener,来模拟consumer处理消息失败触发消息重试。

language-java
1
2
3
4
5
6
7
8
9
10
11
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "simple.queue"),
exchange = @Exchange(name = "simple.exchange"),
key = "simple"
))
public void listenSimpleQueue(String msg) {

log.info("消费者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1/0);
log.info("consumer handle message success");
}

写一个简单的测试,往simple.exchange发送消息,比如

language-java
1
2
3
4
5
6
@Test
public void testSendMessageSimpleQueue() throws InterruptedException {

String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend("simple.exchange", "simple", message);
}

运行测试,consumer得到以下日志

language-txt
1
2
3
4
5
18:51:10:164  INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener     : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:11:167 INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:168 INFO 24072 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到simple.queue的消息:【hello, spring amqp!】
18:51:13:176 WARN 24072 --- [ntContainer#0-1] o.s.a.r.retry.RepublishMessageRecoverer : Republishing failed message to exchange 'error.exchange' with routing key error
18:51:13:181 INFO 24072 --- [ntContainer#1-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到error.queue的消息:【hello, spring amqp!】

可以看到spring尝试2次重发,一共3次,第一次间隔1秒,第二次间隔2秒,重试次数耗尽,消息被consumer传入error.exchange,注意,是consumer传的,不是simple.queue