Kafka基础组件图推演

Kafka基础组件图推演

1. Controller Broker

Kafka集群中有一个Controller Broker,负责元数据管理和协调。

Kafka使用Zookeeper作为集群元数据的存储和管理工具。Zookeeper保存了集群的状态信息,包括所有的Topic、分区、Leader和副本信息等。

当集群状态发生变化时,Controller Broker会将变更信息写入Zookeeper。

当现任Controller Broker发生故障时,Kafka集群中的其他Broker会检测到这一情况,并通过Zookeeper进行选举。

一个Broker成功竞选为新的Controller Broker后,会从Zookeeper读取最新的集群元数据。

保障机制

  • 持久化元数据:Zookeeper持久化存储集群的元数据,确保在任何时刻都可以获取到最新的状态信息。
  • 心跳机制:Broker与Zookeeper之间通过心跳机制保持连接,快速检测Controller Broker的故障。
  • 选举机制:通过Zookeeper的选举机制,能够快速选举出新的Controller Broker,并从Zookeeper同步元数据。

2. 组件架构

1. Log Manager

LogManager主要负责管理Kafka日志(log)的存储和检索。

比如:生产者将消息发送到Partition0的Leader Broker1。LogManager在Broker1上将消息写入Partition0的日志文件。

2. Replication Manager

ReplicationManager主要负责管理分区数据的复制和同步。

每个分区的Leader和Follower之间的同步是独立进行的。也就是说,每个分区都有自己的同步过程,不依赖于其他分区。

虽然每个分区的同步过程是独立的,但每个Broker会为它所管理的每个分区(无论是Leader还是Follower)启动相应的复制线程,这些线程负责处理具体的同步任务。

比如:ReplicationManager在Broker1上将新写入的消息推送到Partition0的Follower Broker2和Broker3。ReplicationManager在Broker2和Broker3上处理从Broker1接收的复制请求,将消息写入它们各自的日志文件。

3. SocketServer

SocketServer是Kafka Broker中的一个组件,负责处理网络连接和I/O操作。它负责接受来自客户端和其他Broker的连接请求,并为每个连接分配一个线程进行处理。

4. NetworkServer

NetworkServer是Kafka的网络通信框架的一个核心部分,负责管理和调度网络请求。它使用了NIO(非阻塞I/O)来处理高并发的网络连接。

5. ZKClient

与Zookeeper通信的组件。

Kafka基础框架图推演

Kafka基础框架图推演

1. 单点模型

1. 名词概念

  1. Broker:是指Kafka集群中的一个节点。一个Kafka集群由多个Broker组成,这些Broker共同协作来处理消息的存储、传输和消费。Broker管理一个或多个分区。

  2. Topic:生产者将消息发送到指定的Topic,消费者订阅Topic以获取消息。Topic本身只是一个逻辑上的分组,不具有物理存储的概念。

  3. Partition:是Topic的子集,是Kafka中实际存储和处理消息的基本单元。每个Topic可以分成多个Partition,每个Partition都是一个有序的、不可变的消息序列。

  4. Replica:分区可以存在多副本。

  5. Leader Broker:分区的多副本下,负责处理所有的读写请求的分区所处的broker。

  6. FollowerBroker:分区的多副本下,负责同步Leader的数据的分区所处的broker。

生产者把消息(record)发送到kafka,消费者通过偏移量(offset,类似数组的下标)获取数据。

同时每个分区会有自己的Log文件,kafka使用log文件来把数据保存到磁盘。

2. 分布式集群-横向扩展

1. Topic多分区

关于生产

生产者通过Bootstrap Broker连接到Kafka集群。这一步是为了建立初始连接,并获取集群的元数据。

一旦生产者获取了这些元数据,它就知道每个分区的Leader Broker是谁,从而可以将消息直接发送到正确的Leader Broker。

生产者发送消息时必须指定Topic,但是分区是可选的。

  • 不指定分区:如果生产者没有手动指定分区,Kafka会根据默认的分区策略将消息分配到分区。默认的分区策略如下:
    • 如果消息有键(Key),Kafka会根据键的哈希值来确定分区。相同的键总是被分配到同一个分区。
    • 如果消息没有键(Key),Kafka会使用轮询或随机的方式将消息分配到分区,以确保消息分布均匀。
  • 指定分区:生产者也可以在发送消息时明确指定分区。这样,消息会直接发送到指定的分区。

在Kafka中,生产者将消息发送到Broker时,Broker的第一个操作就是将消息记录到磁盘中,以确保消息的持久性和可靠性。

关于消费

Kafka中的消费者通常属于一个消费者组(Consumer Group)。每个消费者组有一个唯一的组ID。消费者组的概念用于实现消息的负载均衡和并行消费。

当多个消费者属于同一个组时,Kafka会将Topic的分区分配给组内的消费者。**每个分区只能由组内的一个消费者消费**,这样可以实现负载均衡。

  • 单个消费者订阅一个Topic

    • 如果只有一个消费者订阅了一个Topic,那么该消费者会接收到该Topic中的所有消息。
  • 多个消费者属于同一个组

    • Topic中的分区会在组内的消费者之间进行分配。每个分区只会被组内的一个消费者消费。
    • 如果消费者数量多于分区数,多余的消费者将不会分配到任何分区,处于空闲状态。这些消费者可以在有其他消费者退出时自动接管其分区,从而实现高可用性。
    • 如果消费者数量少于分区数,有些消费者会被分配多个分区。
  • 多个消费者属于不同的组

    • 每个组都会独立消费Topic中的所有消息。也就是说,消息会被广播到所有组中的消费者。

关于分区新增

Kafka会在集群中创建新的分区。这些新的分区会被分配到不同的Broker,以实现数据的均衡存储和高可用性。Kafka不会自动将现有分区的数据重新分配或均衡到新的分区。新的分区从创建时开始是空的,只有在后续生产者发送消息时,才会向这些新的分区写入数据。消费者组会感知到分区数量的变化,并触发重新平衡。

2. 分区多副本

Kafka允许每个分区有多个副本(Replica),这些副本存储在不同的Broker上。一个副本被称为Leader,负责处理所有的读写请求,其他副本为Follower,负责同步Leader的数据。

多个副本同时只有一个副本可以读写,这就是Leader副本,其他副本成为Follower副本,用作备份。

微服务OAuth 2.1认证授权可行性方案(Spring Security 6)

微服务OAuth 2.1认证授权可行性方案(Spring Security 6)

一、背景

Oauth2停止维护,基于OAuth 2.1OpenID Connect 1.0Spring Authorization Server模块独立于SpringCloud

本文开发环境如下:

Version
Java 17
SpringCloud 2023.0.0
SpringBoot 3.2.1
Spring Authorization Server 1.2.1
Spring Security 6.2.1
mysql 8.2.0

https://spring.io/projects/spring-security#learn
https://spring.io/projects/spring-authorization-server#learn

二、微服务架构介绍

一个认证服务器(也是一个微服务),专门用于颁发JWT。
一个网关(也是一个微服务),用于白名单判断和JWT校验。
若干微服务。

本文的关键在于以下几点:

  • 搭建认证服务器
  • 网关白名单判断
  • 网关验证JWT
  • 认证服务器如何共享公钥,让其余微服务有JWT自校验的能力。

三、认证服务器

这里是官方文档https://spring.io/projects/spring-authorization-server#learn
基本上跟着Getting Started写完就可以。

1. 数据库创建

新建一个数据库xc_users
然后执行jar里自带的三个sql

这一步官方并没有给出,大概因为可以使用内存存储,在简单demo省去了持久化。不建立数据库可能也是可行的,我没试过。

2. 新建模块

新建一个auth模块,作为认证服务器。

3. 导入依赖和配置

language-xml
1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-authorization-server</artifactId>
</dependency>
language-yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
server:
servlet:
context-path: /auth
port: 63070
spring:
application:
name: auth-service
profiles:
active: dev
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.101.65:3306/xc_users?serverTimezone=UTC&userUnicode=true&useSSL=false&
username: root
password: 1009

4. 安全认证配置类

language-java
1
2
3
4
@Configuration
@EnableWebSecurity
public class AuthServerSecurityConfig {
}

里面包含诸多内容,有来自Spring Security的,也有来自的Spring Authorization Server的。

  1. UserDetailsService 的实例,用于检索用户进行身份验证。
language-java
1
2
3
4
5
6
7
8
9
10
@Bean
public UserDetailsService userDetailsService() {

UserDetails userDetails = User
.withUsername("lisi")
.password("456")
.roles("read")
.build();
return new InMemoryUserDetailsManager(userDetails);
}
  1. 密码编码器(可选,本文不用)
language-java
1
2
3
4
5
6
7
8
    @Bean
public PasswordEncoder passwordEncoder() {

// 密码为明文方式
return NoOpPasswordEncoder.getInstance();
// 或使用 BCryptPasswordEncoder
// return new BCryptPasswordEncoder();
}
  1. 协议端点的 Spring Security 过滤器链
language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Bean
@Order(1)
public SecurityFilterChain authorizationServerSecurityFilterChain(HttpSecurity http)
throws Exception {

OAuth2AuthorizationServerConfiguration.applyDefaultSecurity(http);
http.getConfigurer(OAuth2AuthorizationServerConfigurer.class)
.oidc(Customizer.withDefaults()); // Enable OpenID Connect 1.0
http

// Redirect to the login page when not authenticated from the
// authorization endpoint
.exceptionHandling((exceptions) -> exceptions
.defaultAuthenticationEntryPointFor(
new LoginUrlAuthenticationEntryPoint("/login"),
new MediaTypeRequestMatcher(MediaType.TEXT_HTML)
)
)
// Accept access tokens for User Info and/or Client Registration
.oauth2ResourceServer((resourceServer) -> resourceServer
.jwt(Customizer.withDefaults()));

return http.build();
}
  1. 用于身份验证的 Spring Security 过滤器链。
    至于哪些要校验身份,哪些不用,根据自己需求写。
language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Bean
@Order(2)
public SecurityFilterChain defaultFilterChain(HttpSecurity http) throws Exception {

http
.authorizeHttpRequests((authorize) ->
authorize
.requestMatchers(new AntPathRequestMatcher("/actuator/**")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/login")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/oauth2/**")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/**/*.html")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/**/*.json")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/auth/**")).permitAll()
.anyRequest().authenticated()
)
.formLogin(Customizer.withDefaults())
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt
.jwtAuthenticationConverter(jwtAuthenticationConverter())
)
);

return http.build();
}
  1. 自定义验证转化器(可选)
language-java
1
2
3
4
5
6
7
private JwtAuthenticationConverter jwtAuthenticationConverter() {

JwtAuthenticationConverter jwtConverter = new JwtAuthenticationConverter();
// 此处可以添加自定义逻辑来提取JWT中的权限等信息
// jwtConverter.setJwtGrantedAuthoritiesConverter(...);
return jwtConverter;
}
  1. 用于管理客户端的 RegisteredClientRepository 实例
language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
	@Bean
public RegisteredClientRepository registeredClientRepository() {

RegisteredClient registeredClient = RegisteredClient.withId(UUID.randomUUID().toString())
.clientId("XcWebApp")
// .clientSecret("{noop}XcWebApp")
.clientSecret("XcWebApp")
.clientAuthenticationMethod(ClientAuthenticationMethod.CLIENT_SECRET_BASIC)
.authorizationGrantType(AuthorizationGrantType.AUTHORIZATION_CODE)
.authorizationGrantType(AuthorizationGrantType.REFRESH_TOKEN)
.authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
.redirectUri("http://www.51xuecheng.cn")
// .postLogoutRedirectUri("http://localhost:63070/login?logout")
.scope("all")
.scope(OidcScopes.OPENID)
.scope(OidcScopes.PROFILE)
.scope("message.read")
.scope("message.write")
.scope("read")
.scope("write")
.clientSettings(ClientSettings.builder().requireAuthorizationConsent(true).build())
.tokenSettings(TokenSettings.builder()
.accessTokenTimeToLive(Duration.ofHours(2)) // 设置访问令牌的有效期
.refreshTokenTimeToLive(Duration.ofDays(3)) // 设置刷新令牌的有效期
.reuseRefreshTokens(true) // 是否重用刷新令牌
.build())
.build();

return new InMemoryRegisteredClientRepository(registeredClient);
}
  1. 用于对访问令牌进行签名的实例
language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Bean
public JWKSource<SecurityContext> jwkSource() {

KeyPair keyPair = generateRsaKey();
RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic();
RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate();
RSAKey rsaKey = new RSAKey.Builder(publicKey)
.privateKey(privateKey)
.keyID(UUID.randomUUID().toString())
.build();
JWKSet jwkSet = new JWKSet(rsaKey);
return new ImmutableJWKSet<>(jwkSet);
}
private static KeyPair generateRsaKey() {

KeyPair keyPair;
try {

KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
keyPairGenerator.initialize(2048);
keyPair = keyPairGenerator.generateKeyPair();
}
catch (Exception ex) {

throw new IllegalStateException(ex);
}
return keyPair;
}
  1. 用于解码签名访问令牌的JwtDecoder 实例
language-java
1
2
3
4
5
@Bean
public JwtDecoder jwtDecoder(JWKSource<SecurityContext> jwkSource) {

return OAuth2AuthorizationServerConfiguration.jwtDecoder(jwkSource);
}
  1. 用于配置Spring Authorization ServerAuthorizationServerSettings 实例
language-java
1
2
3
4
5
@Bean
public AuthorizationServerSettings authorizationServerSettings() {

return AuthorizationServerSettings.builder().build();
}

这里可以设置各种端点的路径,默认路径点开builder()即可看到,如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static Builder builder() {

return new Builder()
.authorizationEndpoint("/oauth2/authorize")
.deviceAuthorizationEndpoint("/oauth2/device_authorization")
.deviceVerificationEndpoint("/oauth2/device_verification")
.tokenEndpoint("/oauth2/token")
.jwkSetEndpoint("/oauth2/jwks")
.tokenRevocationEndpoint("/oauth2/revoke")
.tokenIntrospectionEndpoint("/oauth2/introspect")
.oidcClientRegistrationEndpoint("/connect/register")
.oidcUserInfoEndpoint("/userinfo")
.oidcLogoutEndpoint("/connect/logout");
}

这里我必须吐槽一下,qnmd /.well-known/jwks.json,浪费我一下午。获取公钥信息的端点现在已经替换成了/oauth2/jwks。

四、认证服务器测试

基本上跟着Getting Started走就行。只不过端点的变动相较于Oauth2很大,还有使用方法上不同。

在配置RegisteredClient的时候,我们设置了三种GrantType,这里只演示两种AUTHORIZATION_CODECLIENT_CREDENTIALS

1. AUTHORIZATION_CODE(授权码模式)

1. 获取授权码

用浏览器打开以下网址,

language-txt
1
http://localhost:63070/auth/oauth2/authorize?client_id=XcWebApp&response_type=code&scope=all&redirect_uri=http://www.51xuecheng.cn

对应oauth2/authorize端点,后面的参数和当时设置RegisteredClient 保持对应就行。response_type一定是code
进入到登陆表单,输入lisi - 456登陆。

选择all,同意请求。

url被重定向到http://www.51xuecheng.cn,并携带一个code,这就是授权码。

language-txt
1
http://www.51xuecheng.cn/?code=9AexK_KFH1m3GiNBKsc0FU2KkedM2h_6yR-aKF-wPnpQT5USKLTqoZiSkHC3GUvt-56_ky-E3Mv5LbMeH9uyd-S1UV6kfJO6znqAcCAF43Yo4ifxTAQ8opoPJTjLIRUC

2. 获取JWT

使用apifox演示,postmanidea-http都可以。
localhost:63070/auth服务的/oauth2/token端点发送Post请求,同时需要携带认证信息。
认证信息可以如图所填的方法,也可以放到Header中,具体做法是将客户端ID和客户端密码用冒号(:)连接成一个字符串,进行Base64编码放入HTTP请求的Authorization头部中,前缀为Basic 。比如
Authorization: Basic bXlDbGllbnRJZDpteUNsaWVudFNlY3JldA==

得到JWT

2. CLIENT_CREDENTIALS(客户端凭证模式)

不需要授权码,直接向localhost:63070/auth服务的/oauth2/token端点发送Post请求,同时需要携带认证信息。

五、Gateway

至于gateway基础搭建步骤和gateway管理的若干微服务本文不做指导。

相较于auth模块(也就是Authorization Server),gateway的角色是Resource Server

1. 引入依赖

language-xml
1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-resource-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>

2. 添加白名单文件

resource下添加security-whitelist.properties文件。
写入以下内容

language-properties
1
2
3
/auth/**=????
/content/open/**=??????????
/media/open/**=??????????

3. 全局过滤器

在全局过滤器中,加载白名单,然后对请求进行判断。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Component
@Slf4j
public class GatewayAuthFilter implements GlobalFilter, Ordered {


//白名单
private static List<String> whitelist = null;

static {

//加载白名单
try (
InputStream resourceAsStream = GatewayAuthFilter.class.getResourceAsStream("/security-whitelist.properties");
) {

Properties properties = new Properties();
properties.load(resourceAsStream);
Set<String> strings = properties.stringPropertyNames();
whitelist= new ArrayList<>(strings);

} catch (Exception e) {

log.error("加载/security-whitelist.properties出错:{}",e.getMessage());
e.printStackTrace();
}
}


@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

String requestUrl = exchange.getRequest().getPath().value();
log.info("请求={}",requestUrl);
AntPathMatcher pathMatcher = new AntPathMatcher();
//白名单放行
for (String url : whitelist) {

if (pathMatcher.match(url, requestUrl)) {

return chain.filter(exchange);
}
}
}

private Mono<Void> buildReturnMono(String error, ServerWebExchange exchange) {

ServerHttpResponse response = exchange.getResponse();
String jsonString = JSON.toJSONString(new RestErrorResponse(error));
byte[] bits = jsonString.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = response.bufferFactory().wrap(bits);
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
return response.writeWith(Mono.just(buffer));
}

@Override
public int getOrder() {

return 0;
}
}

4. 获取远程JWKS

yml配置中添加jwk-set-uri属性。

language-yml
1
2
3
4
5
6
spring:
security:
oauth2:
resourceserver:
jwt:
jwk-set-uri: http://localhost:63070/auth/oauth2/jwks

新建配置类,自动注入JwtDecoder

language-java
1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class JwtDecoderConfig {

@Value("${spring.security.oauth2.resourceserver.jwt.jwk-set-uri}")
String jwkSetUri;
@Bean
public JwtDecoder jwtDecoderLocal() {

return NimbusJwtDecoder.withJwkSetUri(jwkSetUri).build();
}
}

5. 校验JWT

在全局过滤器中补全逻辑。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@Component
@Slf4j
public class GatewayAuthFilter implements GlobalFilter, Ordered {


@Lazy
@Autowired
private JwtDecoder jwtDecoderLocal;

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

String requestUrl = exchange.getRequest().getPath().value();
log.info("请求={}",requestUrl);
AntPathMatcher pathMatcher = new AntPathMatcher();
//白名单放行
for (String url : whitelist) {

if (pathMatcher.match(url, requestUrl)) {

return chain.filter(exchange);
}
}

//检查token是否存在
String token = getToken(exchange);
log.info("token={}",token);
if (StringUtils.isBlank(token)) {

return buildReturnMono("没有携带Token,没有认证",exchange);
}
// return chain.filter(exchange);
try {

Jwt jwt = jwtDecoderLocal.decode(token);
// 如果没有抛出异常,则表示JWT有效

// 此时,您可以根据需要进一步检查JWT的声明
log.info("token有效期至:{}", formatInstantTime(jwt.getExpiresAt()));
return chain.filter(exchange);
} catch (JwtValidationException e) {

log.info("token验证失败:{}",e.getMessage());
return buildReturnMono("认证token无效",exchange);
}
}

/**
* 从请求头Authorization中获取token
*/
private String getToken(ServerWebExchange exchange) {

String tokenStr = exchange.getRequest().getHeaders().getFirst("Authorization");
if (StringUtils.isBlank(tokenStr)) {

return null;
}
String token = tokenStr.split(" ")[1];
if (StringUtils.isBlank(token)) {

return null;
}
return token;
}

/**
* 格式化Instant时间
*
* @param expiresAt 在到期
* @return {@link String}
*/
public String formatInstantTime(Instant expiresAt) {

// 将Instant转换为系统默认时区的LocalDateTime
LocalDateTime dateTime = LocalDateTime.ofInstant(expiresAt, ZoneId.systemDefault());

// 定义日期时间的格式
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

// 格式化日期时间并打印
return dateTime.format(formatter);
}

}

6. 测试(如何携带JWT)

携带一个正确的JWTgateway发送请求。
JWT写到HeaderAuthorization字段中,添加前缀Bearer(用空格隔开),向gateway微服务所在地址发送请求。

gateway日志输出。

六、后记

颁发JWT都归一个认证服务器管理,校验JWT都归Gateway管理,至于授权,则由各个微服务自己定义。耦合性低、性能较好。

关于授权,可以接着这篇文章。
微服务OAuth 2.1认证授权Demo方案(Spring Security 6)

微服务OAuth 2.1认证授权Demo方案(Spring Security 6)

微服务OAuth 2.1认证授权Demo方案(Spring Security 6)

书接上文
微服务OAuth 2.1认证授权可行性方案(Spring Security 6)

一、介绍

三个微服务

  • auth微服务作为认证服务器,用于颁发JWT
  • gateway微服务作为网关,用于拦截过滤。
  • content微服务作为资源服务器,用于校验授权。

以下是授权相关数据库。

  • user表示用户表
  • role表示角色表
  • user_role关联了用户和角色,表示某个用户是是什么角色。一个用户可以有多个角色
  • menu表示资源权限表。@PreAuthorize("hasAuthority('xxx')")时用的就是这里的code
  • permission关联了角色和资源权限,表示某个角色用于哪些资源访问权限,一个角色有多个资源访问权限。

当我们知道userId,我们就可以知道这个用户可以访问哪些资源,并把这些权限(也就是menu里的code字段)写成数组,写到JWT的负载部分的authorities字段中。当用户携带此JWT访问具有@PreAuthorize("hasAuthority('xxx')")修饰的资源时,我们解析出JWT中的authorities字段,判断是否包含hasAuthority指定的xxx权限,以此来完成所谓的的”授权”。

二、auth微服务代码

1. SecurityConfig

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
package com.xuecheng.auth.config;

import com.nimbusds.jose.jwk.JWKSet;
import com.nimbusds.jose.jwk.RSAKey;
import com.nimbusds.jose.jwk.source.ImmutableJWKSet;
import com.nimbusds.jose.jwk.source.JWKSource;
import com.nimbusds.jose.proc.SecurityContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.http.MediaType;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.oauth2.core.AuthorizationGrantType;
import org.springframework.security.oauth2.core.ClientAuthenticationMethod;
import org.springframework.security.oauth2.core.oidc.OidcScopes;
import org.springframework.security.oauth2.jwt.JwtDecoder;
import org.springframework.security.oauth2.server.authorization.client.InMemoryRegisteredClientRepository;
import org.springframework.security.oauth2.server.authorization.client.RegisteredClient;
import org.springframework.security.oauth2.server.authorization.client.RegisteredClientRepository;
import org.springframework.security.oauth2.server.authorization.config.annotation.web.configuration.OAuth2AuthorizationServerConfiguration;
import org.springframework.security.oauth2.server.authorization.config.annotation.web.configurers.OAuth2AuthorizationServerConfigurer;
import org.springframework.security.oauth2.server.authorization.settings.AuthorizationServerSettings;
import org.springframework.security.oauth2.server.authorization.settings.ClientSettings;
import org.springframework.security.oauth2.server.authorization.settings.TokenSettings;
import org.springframework.security.oauth2.server.authorization.token.JwtEncodingContext;
import org.springframework.security.oauth2.server.authorization.token.OAuth2TokenCustomizer;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.web.authentication.LoginUrlAuthenticationEntryPoint;
import org.springframework.security.web.authentication.logout.SecurityContextLogoutHandler;
import org.springframework.security.web.util.matcher.AntPathRequestMatcher;
import org.springframework.security.web.util.matcher.MediaTypeRequestMatcher;

import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;


/**
* 身份验证服务器安全配置
*
* @author mumu
* @date 2024/02/13
*/
//@EnableGlobalMethodSecurity(securedEnabled = true, prePostEnabled = true)
@Configuration
@EnableWebSecurity
public class AuthServerSecurityConfig {



private static KeyPair generateRsaKey() {

KeyPair keyPair;
try {

KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
keyPairGenerator.initialize(2048);
keyPair = keyPairGenerator.generateKeyPair();
} catch (Exception ex) {

throw new IllegalStateException(ex);
}
return keyPair;
}

/**
* 密码编码器
* 用于加密认证服务器client密码和用户密码
*
* @return {@link PasswordEncoder}
*/
@Bean
public PasswordEncoder passwordEncoder() {

// 密码为明文方式
// return NoOpPasswordEncoder.getInstance();
// 或使用 BCryptPasswordEncoder
return new BCryptPasswordEncoder();
}

/**
* 授权服务器安全筛选器链
* <br/>
* 来自Spring Authorization Server示例,用于暴露Oauth2.1端点,一般不影响常规的请求
*
* @param http http
* @return {@link SecurityFilterChain}
* @throws Exception 例外
*/
@Bean
@Order(1)
public SecurityFilterChain authorizationServerSecurityFilterChain(HttpSecurity http)
throws Exception {

OAuth2AuthorizationServerConfiguration.applyDefaultSecurity(http);
http.getConfigurer(OAuth2AuthorizationServerConfigurer.class)
.oidc(Customizer.withDefaults()); // Enable OpenID Connect 1.0
http
// Redirect to the login page when not authenticated from the
// authorization endpoint
.exceptionHandling((exceptions) -> exceptions
.defaultAuthenticationEntryPointFor(
new LoginUrlAuthenticationEntryPoint("/login"),
new MediaTypeRequestMatcher(MediaType.TEXT_HTML)
)
)
// Accept access tokens for User Info and/or Client Registration
.oauth2ResourceServer((resourceServer) -> resourceServer
.jwt(Customizer.withDefaults()));

return http.build();
}

/**
* 默认筛选器链
* <br/>
* 这个才是我们需要关心的过滤链,可以指定哪些请求被放行,哪些请求需要JWT验证
*
* @param http http
* @return {@link SecurityFilterChain}
* @throws Exception 例外
*/
@Bean
@Order(2)
public SecurityFilterChain defaultFilterChain(HttpSecurity http) throws Exception {

http
.authorizeHttpRequests((authorize) ->
authorize
.requestMatchers(new AntPathRequestMatcher("/actuator/**")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/login")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/logout")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/wxLogin")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/register")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/oauth2/**")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/**/*.html")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/**/*.json")).permitAll()
.requestMatchers(new AntPathRequestMatcher("/auth/**")).permitAll()
.anyRequest().authenticated()
)
.csrf(AbstractHttpConfigurer::disable)
//指定logout端点,用于退出登陆,不然二次获取授权码时会自动登陆导致短时间内无法切换用户
.logout(logout -> logout
.logoutUrl("/logout")
.addLogoutHandler(new SecurityContextLogoutHandler())
.logoutSuccessUrl("http://www.51xuecheng.cn")
)
.formLogin(Customizer.withDefaults())
.oauth2ResourceServer(oauth2 -> oauth2.jwt(Customizer.withDefaults())
// .jwt(jwt -> jwt
// .jwtAuthenticationConverter(jwtAuthenticationConverter())
// )
);

return http.build();
}

private JwtAuthenticationConverter jwtAuthenticationConverter() {

JwtAuthenticationConverter jwtConverter = new JwtAuthenticationConverter();
return jwtConverter;
}

/**
* 客户端管理实例
* <br/>
* 来自Spring Authorization Server示例
*
* @return {@link RegisteredClientRepository}
*/
@Bean
public RegisteredClientRepository registeredClientRepository() {

RegisteredClient registeredClient = RegisteredClient.withId(UUID.randomUUID().toString())
.clientId("XcWebApp")
.clientSecret(passwordEncoder().encode("XcWebApp"))
.clientAuthenticationMethod(ClientAuthenticationMethod.CLIENT_SECRET_BASIC)
.authorizationGrantType(AuthorizationGrantType.AUTHORIZATION_CODE)
.authorizationGrantType(AuthorizationGrantType.REFRESH_TOKEN)
.authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
.redirectUri("http://www.51xuecheng.cn")
.redirectUri("http://localhost:63070/auth/wxLogin")
.redirectUri("http://www.51xuecheng.cn/sign.html")
// .postLogoutRedirectUri("http://localhost:63070/login?logout")
.scope("all")
.scope(OidcScopes.OPENID)
.scope(OidcScopes.PROFILE)
.scope("message.read")
.scope("message.write")
.scope("read")
.scope("write")
.clientSettings(ClientSettings.builder().requireAuthorizationConsent(true).build())
.tokenSettings(TokenSettings.builder()
.accessTokenTimeToLive(Duration.ofHours(2)) // 设置访问令牌的有效期
.refreshTokenTimeToLive(Duration.ofDays(3)) // 设置刷新令牌的有效期
.reuseRefreshTokens(true) // 是否重用刷新令牌
.build())
.build();

return new InMemoryRegisteredClientRepository(registeredClient);
}

/**
* jwk源
* <br/>
* 对访问令牌进行签名的示例,里面包含公私钥信息。
*
* @return {@link JWKSource}<{@link SecurityContext}>
*/
@Bean
public JWKSource<SecurityContext> jwkSource() {

KeyPair keyPair = generateRsaKey();
RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic();
RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate();
RSAKey rsaKey = new RSAKey.Builder(publicKey)
.privateKey(privateKey)
.keyID(UUID.randomUUID().toString())
.build();
JWKSet jwkSet = new JWKSet(rsaKey);
return new ImmutableJWKSet<>(jwkSet);
}

/**
* jwt解码器
* <br/>
* JWT解码器,主要就是基于公钥信息来解码
*
* @param jwkSource jwk源
* @return {@link JwtDecoder}
*/
@Bean
public JwtDecoder jwtDecoder(JWKSource<SecurityContext> jwkSource) {

return OAuth2AuthorizationServerConfiguration.jwtDecoder(jwkSource);
}

@Bean
public AuthorizationServerSettings authorizationServerSettings() {

return AuthorizationServerSettings.builder().build();
}

/**
* JWT定制器
* <BR/>
* 可以往JWT从加入额外信息,这里是加入authorities字段,是一个权限数组。
*
* @return {@link OAuth2TokenCustomizer}<{@link JwtEncodingContext}>
*/
@Bean
public OAuth2TokenCustomizer<JwtEncodingContext> jwtTokenCustomizer() {

return context -> {

Authentication authentication = context.getPrincipal();
if (authentication.getPrincipal() instanceof UserDetails userDetails) {

List<String> authorities = userDetails.getAuthorities().stream()
.map(GrantedAuthority::getAuthority)
.collect(Collectors.toList());
context.getClaims().claim("authorities", authorities);
}
};
}
}

这里需要注意几点

  • 使用BCryptPasswordEncoder密码加密,在设置clientSecret时需要手动使用密码编码器。
  • jwtTokenCustomizer解析UserDetails然后往JWT中添加authorities字段,为了后面的授权。

2. UserDetailsService

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.xuecheng.ucenter.service.impl;


import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xuecheng.ucenter.mapper.XcMenuMapper;
import com.xuecheng.ucenter.mapper.XcUserMapper;
import com.xuecheng.ucenter.model.dto.AuthParamsDto;
import com.xuecheng.ucenter.model.dto.XcUserExt;
import com.xuecheng.ucenter.model.po.XcMenu;
import com.xuecheng.ucenter.model.po.XcUser;
import com.xuecheng.ucenter.service.AuthService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@Component
@Slf4j
public class UserServiceImpl implements UserDetailsService {


@Autowired
private MyAuthService myAuthService;
@Autowired
XcMenuMapper xcMenuMapper;


/**
* 用户统一认证
*
* @param s 用户信息Json字符串
* @return {@link UserDetails}
* @throws UsernameNotFoundException 找不到用户名异常
*/
@Override
public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {

XcUserExt xcUserExt = myAuthService.execute(username);
return getUserPrincipal(xcUserExt);
}

public UserDetails getUserPrincipal(XcUserExt user){

//用户权限,如果不加报Cannot pass a null GrantedAuthority collection
List<XcMenu> xcMenus = xcMenuMapper.selectPermissionByUserId(user.getId());
String[] permissions = {
"read"};
if (ObjectUtils.isNotEmpty(xcMenus)){

permissions = xcMenus.stream().map(XcMenu::getCode).toList().toArray(String[]::new);
log.info("权限如下:{}", Arrays.toString(permissions));
}
//为了安全在令牌中不放密码
String password = user.getPassword();
user.setPassword(null);
//将user对象转json
String userString = JSON.toJSONString(user);
//创建UserDetails对象
return User.withUsername(userString).password(password).authorities(permissions).build();
}
}

这里需要注意几点

  • username就是前端/auth/login的时候输入的账户名。
  • myAuthService.execute(username)不抛异常,就默认表示账户存在,此时将password加入UserDetails 并返回,Spring Authorization Server对比校验两个密码。
  • myAuthService.execute(username)根据username获取用户信息返回,将用户信息存入withUsername中,Spring Authorization Server默认会将其加入到JWT中。
  • 现在Spring Authorization Server默认不会把authorities(permissions)写入JWT,需要配合OAuth2TokenCustomizer手动写入。

3. 总结

这样,auth微服务颁发的JWT,现在就会包含authorities字段。示例如下

language-json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{

"active": true,
"sub": "{\"cellphone\":\"17266666637\",\"createTime\":\"2024-02-13 10:33:13\",\"email\":\"1138882663@qq.com\",\"id\":\"012f3a90-2bc9-4a2c-82a3-f9777c9ac10a\",\"name\":\"xiamu\",\"nickname\":\"xiamu\",\"permissions\":[],\"status\":\"1\",\"updateTime\":\"2024-02-13 10:33:13\",\"username\":\"xiamu\",\"utype\":\"101001\",\"wxUnionid\":\"test\"}",
"aud": [
"XcWebApp"
],
"nbf": 1707830437,
"scope": "all",
"iss": "http://localhost:63070/auth",
"exp": 1707837637,
"iat": 1707830437,
"jti": "8a657c60-968f-4d98-8a4c-22a7b4ecd333",
"authorities": [
"xc_sysmanager",
"xc_sysmanager_company",
"xc_sysmanager_doc",
"xc_sysmanager_log",
"xc_teachmanager_course_list"
],
"client_id": "XcWebApp",
"token_type": "Bearer"
}

三、gateway微服务代码

1. 统一处理CORS问题

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@EnableWebFluxSecurity
@Configuration
public class SecurityConfig {

//安全拦截配置
@Bean
public SecurityWebFilterChain webFluxSecurityFilterChain(ServerHttpSecurity http) throws Exception {

return http
.cors(cors -> cors.configurationSource(corsConfigurationSource()))
.authorizeExchange(exchanges ->
exchanges
.pathMatchers("/**").permitAll()
.anyExchange().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2.jwt(Customizer.withDefaults()))
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}

@Bean
public CorsConfigurationSource corsConfigurationSource() {

CorsConfiguration corsConfig = new CorsConfiguration();
corsConfig.addAllowedOriginPattern("*"); // 允许任何源
corsConfig.addAllowedMethod("*"); // 允许任何HTTP方法
corsConfig.addAllowedHeader("*"); // 允许任何HTTP头
corsConfig.setAllowCredentials(true); // 允许证书(cookies)
corsConfig.setMaxAge(3600L); // 预检请求的缓存时间(秒)

UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", corsConfig); // 对所有路径应用这个配置
return source;
}
}

这里需要注意几点

  • 书接上文,这里虽然用了oauth2.jwt(Customizer.withDefaults()),但实际上基于远程auth微服务开放的jwkSetEndpoint配置的JwtDecoder
  • .cors(cors -> cors.configurationSource(corsConfigurationSource()))一次性处理CORS问题。

四、content微服务代码

1. controller

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@PreAuthorize("hasAuthority('xc_teachmanager_course_list')")
@ApiResponse(responseCode = "200", description = "Successfully retrieved user")
@Operation(summary = "查询课程信息列表")
@PostMapping("/course/list")
public PageResult<CourseBase> list(
PageParams pageParams,
@Parameter(description = "请求具体内容") @RequestBody(required = false) QueryCourseParamsDto dto){

SecurityUtil.XcUser xcUser = SecurityUtil.getUser();
if (xcUser != null){

System.out.println(xcUser.getUsername());
System.out.println(xcUser.toString());
}
return courseBaseInfoService.queryCourseBaseList(pageParams, dto);
}

使用了@PreAuthorize("hasAuthority('xc_teachmanager_course_list')")修饰的controller资源。

2. SecurityConfig

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
@EnableWebSecurity
@EnableMethodSecurity
public class SecurityConfig {

//安全拦截配置
@Bean
public SecurityFilterChain defaultFilterChain(HttpSecurity http) throws Exception {

http
.authorizeHttpRequests((authorize) ->
authorize
.requestMatchers("/**").permitAll()
.anyRequest().authenticated()
)
.csrf(AbstractHttpConfigurer::disable)
.oauth2ResourceServer(oauth -> oauth.jwt(jwt -> jwt.jwtAuthenticationConverter(jwtAuthenticationConverter())));
return http.build();
}

private JwtAuthenticationConverter jwtAuthenticationConverter() {

JwtAuthenticationConverter jwtConverter = new JwtAuthenticationConverter();
jwtConverter.setJwtGrantedAuthoritiesConverter(jwt -> {

// 从JWT的claims中提取权限信息
List<String> authorities = jwt.getClaimAsStringList("authorities");
if (authorities == null) {

return Collections.emptyList();
}
return authorities.stream()
.map(SimpleGrantedAuthority::new)
.collect(Collectors.toList());
});
return jwtConverter;
}
}

需要注意几点

  • 使用@EnableMethodSecurity@PreAuthorize生效
  • gateway一样,需要基于远程auth微服务开放的jwkSetEndpoint配置JwtDecoder
  • 指定JwtAuthenticationConverter ,让anyRequest().authenticated()需要验证的请求,除了完成默认的JWT验证外,还需要完成JwtAuthenticationConverter 指定逻辑。
  • JwtAuthenticationConverter 中将JWTauthorities部分形成数组后写入GrantedAuthorities,这正是spring security6用于校验@PreAuthorize的字段。

3. 解析JWT Utils

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slf4j
public class SecurityUtil {

public static XcUser getUser(){

Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication == null){

return null;
}
if (authentication instanceof JwtAuthenticationToken) {

JwtAuthenticationToken jwtAuth = (JwtAuthenticationToken) authentication;
System.out.println(jwtAuth);
Map<String, Object> tokenAttributes = jwtAuth.getTokenAttributes();
System.out.println(tokenAttributes);
Object sub = tokenAttributes.get("sub");
return JSON.parseObject(sub.toString(), XcUser.class);
}
return null;
}
@Data
public static class XcUser implements Serializable {


private static final long serialVersionUID = 1L;

private String id;

private String username;

private String password;

private String salt;

private String name;
private String nickname;
private String wxUnionid;
private String companyId;
/**
* 头像
*/
private String userpic;

private String utype;

private LocalDateTime birthday;

private String sex;

private String email;

private String cellphone;

private String qq;

/**
* 用户状态
*/
private String status;

private LocalDateTime createTime;

private LocalDateTime updateTime;

}
}

JWT的信息解析回XcUser ,相当于用户携带JWT访问后端,后端可以根据JWT获取此用户的信息。当然,你可以尽情的自定义,扩展。

4. 总结

当用户携带JWT访问需要权限的资源时,现在可以正常的校验权限了。

五、一些坑

  1. RegisteredClient时注册那么多redirectUri是因为debug了很久,才发现获取授权码和获取JWT时,redirect_uri参数需要一致。
  2. cors问题,spring secuity6似乎会一开始直接默认拒绝cors,导致跨域请求刚到gateway就寄了,到不了content微服务,即使content微服务配置了CORS的处理方案,也无济于事。
多级缓存架构(五)缓存同步

多级缓存架构(五)缓存同步

通过本文章,可以完成多级缓存架构中的缓存同步。

一、Canal服务

1. mysql添加canal用户

连接在上一次multiCache项目中运行的mysql容器,创建canal用户。

language-sql
1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

2. mysql配置文件

docker/mysql/conf/my.cnf添加如下配置

language-txt
1
2
3
4
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima
binlog_format=row

3. canal配置文件

添加canal服务块到docker-compose.yml,如下

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
canal:
container_name: canal
image: canal/canal-server:v1.1.7
volumes:
- ./canal/logs:/home/admin/canal-server/logs
- ./canal/conf:/home/admin/canal-server/conf
ports:
- "11111:11111"
depends_on:
- mysql
networks:
multi-cache:
ipv4_address: 172.30.3.7
language-bash
1
docker pull canal/canal-server:v1.1.7

任意启动一个canal-server容器,将里面的/home/admin/canal-server/conf文件夹复制到宿主机,对应docker/canal/conf文件夹。
删除此临时容器。

修改docker/canal/conf/canal.properties如下条目

language-dart
1
2
canal.destinations=example
canal.instance.tsdb.enable=true

修改docker/canal/conf/example/instance.properties如下条目

language-dart
1
2
3
4
5
6
7
canal.instance.master.address=172.30.3.2:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.tsdb.enable=true
canal.instance.gtidon=false
canal.instance.filter.regex=heima\\..*

二、引入依赖

pom.xml

language-xml
1
2
3
4
5
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>

application.yml

language-yml
1
2
3
canal:
destination: example
server: 172.30.3.7:11111

三、监听Canal消息

这是canal-spring-boot-starter官方仓库,含使用文档

新建canal.ItemHandler类,内容如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.heima.item.canal;

import com.github.benmanes.caffeine.cache.Cache;
import com.heima.item.config.RedisHandler;
import com.heima.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable(value = "tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {


@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, Item> itemCache;

@Override
public void insert(Item item) {

itemCache.put(item.getId(), item);
redisHandler.saveItem(item);
}

@Override
public void update(Item before, Item after) {

itemCache.put(after.getId(), after);
redisHandler.saveItem(after);
}

@Override
public void delete(Item item) {

itemCache.invalidate(item.getId());
redisHandler.deleteItemById(item.getId());
}
}

修改pojo.Item类,如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.heima.item.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;

import java.util.Date;

@Data
@TableName("tb_item")
public class Item {

@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}

四、运行

到此为止,docker-compose.yml内容应该如下

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
version: '3.8'

networks:
multi-cache:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.30.3.0/24

services:
mysql:
container_name: mysql
image: mysql:8
volumes:
- ./mysql/conf/my.cnf:/etc/mysql/conf.d/my.cnf
- ./mysql/data:/var/lib/mysql
- ./mysql/logs:/logs
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=1009
networks:
multi-cache:
ipv4_address: 172.30.3.2

nginx:
container_name: nginx
image: nginx:stable
volumes:
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./nginx/dist:/usr/share/nginx/dist
ports:
- "8080:8080"
networks:
multi-cache:
ipv4_address: 172.30.3.3

canal:
container_name: canal
image: canal/canal-server:v1.1.7
volumes:
- ./canal/logs:/home/admin/canal-server/logs
- ./canal/conf:/home/admin/canal-server/conf
ports:
- "11111:11111"
depends_on:
- mysql
networks:
multi-cache:
ipv4_address: 172.30.3.7

openresty1:
container_name: openresty1
image: openresty/openresty:1.21.4.3-3-jammy-amd64
volumes:
- ./openresty1/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
- ./openresty1/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./openresty1/lua:/usr/local/openresty/nginx/lua
- ./openresty1/lualib/common.lua:/usr/local/openresty/lualib/common.lua
networks:
multi-cache:
ipv4_address: 172.30.3.11

redis:
container_name: redis
image: redis:7.2
volumes:
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
ports:
- "6379:6379"
command: [ "redis-server", "/usr/local/etc/redis/redis.conf" ]
networks:
multi-cache:
ipv4_address: 172.30.3.21

删除原来的multiCache,重新启动各项服务。

language-bash
1
docker-compose -p multi-cache up -d

启动springboot程序。

五、测试

springboot不断输入类似如下日志,属于正常监听canal消息中。

language-txt
1
2
3
09:27:17:175  INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient  : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
09:27:18:177 INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
09:27:19:178 INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]

访问http://localhost:8081/item/10001,此时信息为tomcat查询数据库所得数据,而后存入Caffeine缓存。
访问http://localhost:8080/item.html?id=10001,此时信息为Redis缓存数据。

然后,
访问http://localhost:8081/来到商品管理页面。

修改id=10001的数据的商品分类

确认后
springboot日志出现类似如下日志

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
09:31:29:234  INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient  : 获取消息 Message[id=1,entries=[header {
version: 1
logfileName: "binlog.000007"
logfileOffset: 236
serverId: 1
serverenCode: "UTF-8"
executeTime: 1705051889000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 93
}
entryType: TRANSACTIONBEGIN
storeValue: " \r"
, header {
version: 1
logfileName: "binlog.000007"
logfileOffset: 411
serverId: 1
serverenCode: "UTF-8"
executeTime: 1705051889000
sourceType: MYSQL
schemaName: "heima"
tableName: "tb_item"
eventLength: 626
eventType: UPDATE
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\bV\020\002P\000b\332\n\n&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\00510001R\006bigint\nd\b\001\020\f\032\005title \000(\0000\000BCRIMOWA 21\345\257\270\346\211\230\350\277\220\347\256\261\346\213\211\346\235\206\347\256\261 SALSA AIR\347\263\273\345\210\227\346\236\234\347\273\277\350\211\262 820.70.36.4R\fvarchar(264)\n)\b\002\020\f\032\004name \000(\0000\000B\tSALSA AIRR\fvarchar(128)\n)\b\003\020\373\377\377\377\377\377\377\377\377\001\032\005price \000(\0000\000B\00516900R\006bigint\n\226\001\b\004\020\f\032\005image \000(\0000\000Buhttps://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webpR\fvarchar(200)\n0\b\005\020\f\032\bcategory \000(\0000\000B\f\346\213\211\346\235\206\347\256\261777R\fvarchar(200)\n\'\b\006\020\f\032\005brand \000(\0000\000B\006RIMOWAR\fvarchar(100)\nG\b\a\020\f\032\004spec \000(\0000\000B\'{\"\351\242\234\350\211\262\": \"\347\272\242\350\211\262\", \"\345\260\272\347\240\201\": \"26\345\257\270\"}R\fvarchar(200)\n\032\b\b\020\004\032\006status \000(\0000\000B\0011R\003int\n6\b\t\020]\032\vcreate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\n6\b\n\020]\032\vupdate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\022&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\00510001R\006bigint\022d\b\001\020\f\032\005title \000(\0000\000BCRIMOWA 21\345\257\270\346\211\230\350\277\220\347\256\261\346\213\211\346\235\206\347\256\261 SALSA AIR\347\263\273\345\210\227\346\236\234\347\273\277\350\211\262 820.70.36.4R\fvarchar(264)\022)\b\002\020\f\032\004name \000(\0000\000B\tSALSA AIRR\fvarchar(128)\022)\b\003\020\373\377\377\377\377\377\377\377\377\001\032\005price \000(\0000\000B\00516900R\006bigint\022\226\001\b\004\020\f\032\005image \000(\0000\000Buhttps://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webpR\fvarchar(200)\0220\b\005\020\f\032\bcategory \000(\0010\000B\f\346\213\211\346\235\206\347\256\261888R\fvarchar(200)\022\'\b\006\020\f\032\005brand \000(\0000\000B\006RIMOWAR\fvarchar(100)\022G\b\a\020\f\032\004spec \000(\0000\000B\'{\"\351\242\234\350\211\262\": \"\347\272\242\350\211\262\", \"\345\260\272\347\240\201\": \"26\345\257\270\"}R\fvarchar(200)\022\032\b\b\020\004\032\006status \000(\0000\000B\0011R\003int\0226\b\t\020]\032\vcreate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\0226\b\n\020]\032\vupdate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime"
],raw=false,rawEntries=[]]
09:31:30:572 INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=2,entries=[header {
version: 1
logfileName: "binlog.000007"
logfileOffset: 1037
serverId: 1
serverenCode: "UTF-8"
executeTime: 1705051889000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\00287"
],raw=false,rawEntries=[]]

这里可以先用redis连接工具查询数据,发现rediis已被更新。

再次访问http://localhost:8081/item/10001直接向springbootcontroller发送请求,发现caffeine数据更新,并且springboot日志没有出现查询记录,说明走的是caffeine

多级缓存架构(三)OpenResty Lua缓存

多级缓存架构(三)OpenResty Lua缓存

通过本文章,可以完成多级缓存架构中的Lua缓存。

一、nginx服务

docker/docker-compose.yml中添加nginx服务块。

language-yaml
1
2
3
4
5
6
7
8
9
10
11
12
nginx:
container_name: nginx
image: nginx:stable
volumes:
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./nginx/dist:/usr/share/nginx/dist
ports:
- "8080:8080"
networks:
multi-cache:
ipv4_address: 172.30.3.3

删除原来docker里的multiCache项目并停止springboot应用。

nginx部分配置如下,监听端口为8080,并且将请求反向代理至172.30.3.11,下一小节,将openresty固定在172.30.3.11

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
upstream nginx-cluster {
server 172.30.3.11;
}

server {
listen 8080;
listen [::]:8080;
server_name localhost;

location /api {
proxy_pass http://nginx-cluster;
}
}

重新启动multiCache看看nginx前端网页效果。

language-css
1
docker-compose -p multi-cache up -d

访问http://localhost:8080/item.html?id=10001查询id=10001商品页

这里是假数据,前端页面会向/api/item/10001发送数据请求。

二、OpenResty服务

1. 服务块定义

docker/docker-compose.yml中添加openresty1服务块。

language-yaml
1
2
3
4
5
6
7
8
9
10
11
openresty1:
container_name: openresty1
image: openresty/openresty:1.21.4.3-3-jammy-amd64
volumes:
- ./openresty1/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
- ./openresty1/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./openresty1/lua:/usr/local/openresty/nginx/lua
- ./openresty1/lualib/common.lua:/usr/local/openresty/lualib/common.lua
networks:
multi-cache:
ipv4_address: 172.30.3.11

2. 配置修改

前端向后端发送/api/item/10001请求关于id=10001商品信息。

根据nginx的配置内容,这个请求首先被nginx拦截,反向代理到172.30.3.11 (即openresty1)。

language-txt
1
2
3
4
5
6
7
8
9
upstream nginx-cluster {
server 172.30.3.11;
}

server {
location /api {
proxy_pass http://nginx-cluster;
}
}

openresty1收到的也是/api/item/10001,同时,openresty/api/item/(\d+)请求代理到指定lua程序,在lua程序中完成数据缓存。

因此,openrestyconf/conf.d/default.conf如下

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
upstream tomcat-cluster {
hash $request_uri;
server 172.30.3.4:8081;
# server 172.30.3.5:8081;
}

server {
listen 80;
listen [::]:80;
server_name localhost;

# intercept /item and join lua
location ~ /api/item/(\d+) {
default_type application/json;
content_by_lua_file lua/item.lua;
}

# intercept lua and redirect to back-end
location /path/ {
rewrite ^/path/(.*)$ /$1 break;
proxy_pass http://tomcat-cluster;
}

error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/dist;
}
}

conf/nginx.confhttp块最后添加3行,引入依赖。

language-txt
1
2
3
4
5
6
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
#本地缓存
lua_shared_dict item_cache 150m;

3. Lua程序编写

common.lua被挂载到lualib,表示可以被其他lua当做库使用,内容如下

language-lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
-- 创建一个本地缓存对象item_cache
local item_cache = ngx.shared.item_cache;

-- 函数,向openresty本身发送类似/path/item/10001请求,根据conf配置,将被删除/path前缀并代理至tomcat程序
local function read_get(path, params)
local rsp = ngx.location.capture('/path'..path,{

method = ngx.HTTP_GET,
args = params,
})
if not rsp then
ngx.log(ngx.ERR, "http not found, path: ", path, ", args: ", params);
ngx.exit(404)
end
return rsp.body
end

-- 函数,如果本地有缓存,使用缓存,如果没有代理到tomcat然后将数据存入缓存
local function read_data(key, expire, path, params)
-- query local cache
local rsp = item_cache:get(key)
-- query tomcat
if not rsp then
ngx.log(ngx.ERR, "redis cache miss, try tomcat, key: ", key)
rsp = read_get(path, params)
end
-- write into local cache
item_cache:set(key, rsp, expire)
return rsp
end

local _M = {

read_data = read_data
}

return _M

item.lua是处理来自形如/api/item/10001请求的程序,内容如下

language-lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- include
local commonUtils = require('common')
local cjson = require("cjson")

-- get url params 10001
local id = ngx.var[1]
-- redirect item, 缓存过期时间1800s, 适合长时间不改变的数据
local itemJson = commonUtils.read_data("item:id:"..id, 1800,"/item/"..id,nil)
-- redirect item/stock, 缓存过期时间4s, 适合经常改变的数据
local stockJson = commonUtils.read_data("item:stock:id:"..id, 4 ,"/item/stock/"..id, nil)
-- json2table
local item = cjson.decode(itemJson)
local stock = cjson.decode(stockJson)
-- combine item and stock
item.stock = stock.stock
item.sold = stock.sold
-- return result
ngx.say(cjson.encode(item))

4. 总结

  1. 这里luaitem(tb_item表)和stock(tb_stock表)两个信息都有缓存,并使用cjson库将两者合并后返回到前端。
  2. 关于expire时效性的问题,如果后台改变了数据,但是openresty关于此数据的缓存未过期,前端得到的是旧数据
  3. 大致来说openresty = nginx + lua,不仅具有nginx反向代理的能力,还能介入lua程序进行扩展。

三、运行

到此为止,docker-compose.yml应该如下

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
version: '3.8'

networks:
multi-cache:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.30.3.0/24

services:
mysql:
container_name: mysql
image: mysql:8
volumes:
- ./mysql/conf/my.cnf:/etc/mysql/conf.d/my.cnf
- ./mysql/data:/var/lib/mysql
- ./mysql/logs:/logs
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=1009
networks:
multi-cache:
ipv4_address: 172.30.3.2

nginx:
container_name: nginx
image: nginx:stable
volumes:
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./nginx/dist:/usr/share/nginx/dist
ports:
- "8080:8080"
networks:
multi-cache:
ipv4_address: 172.30.3.3

openresty1:
container_name: openresty1
image: openresty/openresty:1.21.4.3-3-jammy-amd64
volumes:
- ./openresty1/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
- ./openresty1/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./openresty1/lua:/usr/local/openresty/nginx/lua
- ./openresty1/lualib/common.lua:/usr/local/openresty/lualib/common.lua
networks:
multi-cache:
ipv4_address: 172.30.3.11

启动各项服务

language-bash
1
docker-compose -p multi-cache up -d

启动springboot程序。

四、测试

清空openresty容器日志。
访问http://localhost:8080/item.html?id=10001
查看openresty容器日志,可以看到两次commonUtils.read_data都没有缓存,于是代理到tomcat,可以看到springboot日志出现查询相关记录。

language-txt
1
2
3
2024-01-12 11:45:53 2024/01/12 03:45:53 [error] 7#7: *1 [lua] common.lua:99: read_data(): redis cache miss, try tomcat, key: item:id:10001, client: 172.30.3.3, server: localhost, request: "GET /api/item/10001 HTTP/1.0", host: "nginx-cluster", referrer: "http://localhost:8080/item.html?id=10001"
2024-01-12 11:45:53 2024/01/12 03:45:53 [error] 7#7: *1 [lua] common.lua:99: read_data(): redis cache miss, try tomcat, key: item:stock:id:10001 while sending to client, client: 172.30.3.3, server: localhost, request: "GET /api/item/10001 HTTP/1.0", host: "nginx-cluster", referrer: "http://localhost:8080/item.html?id=10001"
2024-01-12 11:45:53 172.30.3.3 - - [12/Jan/2024:03:45:53 +0000] "GET /api/item/10001 HTTP/1.0" 200 486 "http://localhost:8080/item.html?id=10001" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0"

再次访问此网址,强制刷新+禁用浏览器缓存+更换浏览器
间隔超过4s但小于1800s时,日志如下,只出现一次miss。

language-txt
1
2
2024-01-12 11:48:04 2024/01/12 03:48:04 [error] 7#7: *4 [lua] common.lua:99: read_data(): redis cache miss, try tomcat, key: item:stock:id:10001, client: 172.30.3.3, server: localhost, request: "GET /api/item/10001 HTTP/1.0", host: "nginx-cluster", referrer: "http://localhost:8080/item.html?id=10001"
2024-01-12 11:48:04 172.30.3.3 - - [12/Jan/2024:03:48:04 +0000] "GET /api/item/10001 HTTP/1.0" 200 486 "http://localhost:8080/item.html?id=10001" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0"

再次访问此网址,强制刷新+禁用浏览器缓存+更换浏览器
间隔小于4s,日志如下,未出现miss。

language-txt
1
2024-01-12 11:49:16 172.30.3.3 - - [12/Jan/2024:03:49:16 +0000] "GET /api/item/10001 HTTP/1.0" 200 486 "http://localhost:8080/item.html?id=10001" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0"

五、高可用集群

1. openresty

对于openresty高可用,可以部署多个openresty docker实例,并在nginxdocker/nginx/conf/conf.d/default.confupstream nginx-cluster将多个openresty地址添加进去即可。比如

language-txt
1
2
3
4
5
6
7
upstream nginx-cluster {
hash $request_uri;
# hash $request_uri consistent;
server 172.30.3.11;
server 172.30.3.12;
server 172.30.3.13;
}

多个openresty 无论是conf还是lua都保持一致即可。
并且使用hash $request_uri负载均衡作为反向代理策略,防止同一请求被多个实例缓存数据。

2. tomcat

对于springboot程序高可用,也是类似。可以部署多个springboot docker实例,并在openresty docker/openresty1/conf/conf.d/default.confupstream nginx-cluster将多个springboot地址添加进去即可。比如

language-txt
1
2
3
4
5
upstream tomcat-cluster {
hash $request_uri;
server 172.30.3.4:8081;
server 172.30.3.5:8081;
}
多级缓存架构(二)Caffeine进程缓存

多级缓存架构(二)Caffeine进程缓存

通过本文章,可以完成多级缓存架构中的进程缓存。

一、引入依赖

item-service中引入caffeine依赖

language-xml
1
2
3
4
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

二、实现进程缓存

这是Caffeine官方文档地址

1. 配置Config类

创建config.CaffeineConfig

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class CaffeineConfig {

@Bean
public Cache<Long, Item> itemCache(){

return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}

@Bean
public Cache<Long, ItemStock> stockCache(){

return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
}

2. 修改controller

ItemController中注入两个Cache对象,并修改业务逻辑

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@RestController
@RequestMapping("item")
public class ItemController {


@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
@Autowired
private Cache<Long, Item> itemCache;
@Autowired
private Cache<Long, ItemStock> stockCache;

@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id){

return itemCache.get(id, key->
itemService.query()
.ne("status", 3).eq("id", id)
.one()
);
// return itemService.query()
// .ne("status", 3).eq("id", id)
// .one();
}

@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id){

return stockCache.get(id, key->
stockService.getById(id)
);
// return stockService.getById(id);
}
}

三、运行

Idea结合Docker将springboot放入docker容器中运行,并指定使用multi-cache_multi-cache网络,以及固定172.30.3.4地址。
详细参考如下文章

启动好后,可以看到springboot容器和mysql容器处于同一网络下。(Docker Desktop for Windows插件PortNavigator)

四、测试

访问http://localhost:8081/item/10001可以看到springboot日志输出如下

language-txt
1
2
3
02:45:58:841 DEBUG 1 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne     : ==>  Preparing: SELECT id,name,title,price,image,category,brand,spec,status,create_time,update_time FROM tb_item WHERE (status <> ? AND id = ?)
02:45:58:889 DEBUG 1 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne : ==> Parameters: 3(Integer), 10001(Long)
02:45:58:951 DEBUG 1 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne : <== Total: 1

当我们二次访问此网址,强制刷新+禁用浏览器缓存+更换浏览器,springboot日志都没有新的查询记录,说明使用了Caffeine缓存。

基于Docker Compose单机实现多级缓存架构2024

基于Docker Compose单机实现多级缓存架构2024

一、环境参考

Name Version
Docker Desktop for Windows 4.23.0
Openjdk 8
MySQL 8.2.0
Redis 7.2
Canal 1.1.7
OpenResty 1.21.4.3-3-jammy-amd64
Lua -
Caffeine -

二、专栏简介

多级缓存实现过程比较长,将拆分为多个文章分步讲述。如果一切顺利,大致会得到如下一个多级缓存架构:

本专栏主要对Lua缓存Redis缓存Caffeine缓存进行实践,以及缓存同步实践。依次为以下几篇:

  1. 多级缓存架构(一)项目初始化
  2. 多级缓存架构(二)Caffeine进程缓存
  3. 多级缓存架构(三)OpenResty Lua缓存
  4. 多级缓存架构(四)Redis缓存
  5. 多级缓存架构(五)缓存同步

三、扩展

对于高可用,集群等扩展,例如下图的构造,本专栏只包含部分展开但并不提供实践指导

多级缓存架构(四)Redis缓存

多级缓存架构(四)Redis缓存

通过本文章,可以完成多级缓存架构中的Redis缓存。

一、Redis服务

docker/docker-compose.yml中,添加redis服务块

language-yml
1
2
3
4
5
6
7
8
9
10
11
redis:
container_name: redis
image: redis:7.2
volumes:
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
ports:
- "6379:6379"
command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
networks:
multi-cache:
ipv4_address: 172.30.3.21

二、Redis缓存预热

spirngboot项目启动时,将固定的热点数据提前加载到redis中。

1. 引入依赖

pom.xml添加如下依赖

language-xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.4.RELEASE</version> <!-- 或更高版本 -->
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.41</version>
</dependency>

application.yml添加如下配置

language-yml
1
2
3
spring:
redis:
host: 172.30.3.21

2. handler类实现

新建config.RedisHandler类,内容如下,主要是重写afterPropertiesSet,完成缓存预热逻辑,saveItemdeleteItemById函数给之后的章节使用。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
public class RedisHandler implements InitializingBean {

@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
@Override
public void afterPropertiesSet() throws Exception {

List<Item> itemList = itemService.list();
for (Item item : itemList) {

String json = JSON.toJSONString(item);
redisTemplate.opsForValue().set("item:id:"+item.getId(), json);
}
List<ItemStock> stockList = stockService.list();
for (ItemStock stock : stockList) {

String json = JSON.toJSONString(stock);
redisTemplate.opsForValue().set("item:stock:id:"+stock.getId(), json);
}
}

public void saveItem(Item item){

String json = JSON.toJSONString(item);
redisTemplate.opsForValue().set("item:id:"+item.getId(), json);
}

public void deleteItemById(Long id){

redisTemplate.delete("item:id:"+id);
}
}

三、整合Redis缓存

改进openrestydocker/openresty1/lualib/common.lua,如下

language-lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
local redis = require('resty.redis')
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
-- 创建一个本地缓存对象item_cache
local item_cache = ngx.shared.item_cache;

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end

-- 函数,向openresty本身发送类似/path/item/10001请求,根据conf配置,将被删除/path前缀并代理至tomcat程序
local function read_get(path, params)
local rsp = ngx.location.capture('/path'..path,{

method = ngx.HTTP_GET,
args = params,
})
if not rsp then
ngx.log(ngx.ERR, "http not found, path: ", path, ", args: ", params);
ngx.exit(404)
end
return rsp.body
end

-- 函数,如果本地有缓存,使用缓存,如果没有代理到tomcat然后将数据存入缓存
local function read_data(key, expire, path, params)
-- query local cache
local rsp = item_cache:get(key)
-- query redis
if not rsp then
ngx.log(ngx.ERR, "local cache miss, try redis, key: ", key)
rsp = read_redis("172.30.3.21", 6379, key)
if not rsp then
ngx.log(ngx.ERR, "redis cache miss, try tomcat, key: ", key)
rsp = read_get(path, params)
end
end
-- write into local cache
item_cache:set(key, rsp, expire)
return rsp
end

local _M = {

read_get = read_get,
read_redis = read_redis,
read_data = read_data
}

return _M

item.lua不需要用改动。

四、运行

到此为止,docker-compose.yml内容应该如下

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
version: '3.8'

networks:
multi-cache:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.30.3.0/24

services:
mysql:
container_name: mysql
image: mysql:8
volumes:
- ./mysql/conf/my.cnf:/etc/mysql/conf.d/my.cnf
- ./mysql/data:/var/lib/mysql
- ./mysql/logs:/logs
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=1009
networks:
multi-cache:
ipv4_address: 172.30.3.2

nginx:
container_name: nginx
image: nginx:stable
volumes:
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./nginx/dist:/usr/share/nginx/dist
ports:
- "8080:8080"
networks:
multi-cache:
ipv4_address: 172.30.3.3

openresty1:
container_name: openresty1
image: openresty/openresty:1.21.4.3-3-jammy-amd64
volumes:
- ./openresty1/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
- ./openresty1/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./openresty1/lua:/usr/local/openresty/nginx/lua
- ./openresty1/lualib/common.lua:/usr/local/openresty/lualib/common.lua
networks:
multi-cache:
ipv4_address: 172.30.3.11

redis:
container_name: redis
image: redis:7.2
volumes:
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
ports:
- "6379:6379"
command: [ "redis-server", "/usr/local/etc/redis/redis.conf" ]
networks:
multi-cache:
ipv4_address: 172.30.3.21

删除原来的multiCache,重新启动各项服务。

language-bash
1
docker-compose -p multi-cache up -d

启动springboot程序。

五、测试

1. redis缓存预热

springboot程序启动后,出现查询日志,查看redis数据库发现自动存入了数据。

2. redis缓存命中

清空openresty容器日志,访问http://localhost:8080/item.html?id=10001,查看日志,发现两次commonUtils.read_data都只触发到查询redis,没到查询tomcat

language-txt
1
2
3
2024-01-12 16:06:18 2024/01/12 08:06:18 [error] 7#7: *1 [lua] common.lua:59: read_data(): local cache miss, try redis, key: item:id:10001, client: 172.30.3.3, server: localhost, request: "GET /api/item/10001 HTTP/1.0", host: "nginx-cluster", referrer: "http://localhost:8080/item.html?id=10001"
2024-01-12 16:06:18 2024/01/12 08:06:18 [error] 7#7: *1 [lua] common.lua:59: read_data(): local cache miss, try redis, key: item:stock:id:10001, client: 172.30.3.3, server: localhost, request: "GET /api/item/10001 HTTP/1.0", host: "nginx-cluster", referrer: "http://localhost:8080/item.html?id=10001"
2024-01-12 16:06:18 172.30.3.3 - - [12/Jan/2024:08:06:18 +0000] "GET /api/item/10001 HTTP/1.0" 200 466 "http://localhost:8080/item.html?id=10001" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0"

查看springboot程序日志,也没有查询记录,说明redis缓存命中成功。

六、高可用集群

对于redis高可用集群,可以参考以下专栏文章。
https://blog.csdn.net/m0_51390969/category_12546314.html?spm=1001.2014.3001.5482

多级缓存架构(一)项目初始化

多级缓存架构(一)项目初始化

一、项目克隆

克隆此项目到本地
https://github.com/Xiamu-ssr/MultiCache
来到start目录下,分别有以下文件夹

  • docker:docker相关文件
  • item-service:springboot项目

二、数据库准备

docker/docker-compose.yml中已经定义好如下mysql

language-yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
mysql:
container_name: mysql
image: mysql:8
volumes:
- ./mysql/conf/my.cnf:/etc/mysql/conf.d/my.cnf
- ./mysql/data:/var/lib/mysql
- ./mysql/logs:/logs
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=1009
networks:
multi-cache:
ipv4_address: 172.30.3.2

my.cnf如下

language-bash
1
2
3
4
5
[mysqld]
bind-address=0.0.0.0
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql

运行以下命令启动docker-compose

language-bash
1
docker-compose -p multi-cache up -d

之后使用数据库连接工具连接mysql容器,创建heima数据库,并对其执行docker/mysql/item.sql脚本。

三、项目工程准备

idea打开item-service文件夹,等待idea加载本springboot项目。

如果在docker-compose中服务ip改动,请注意一些可能关联的地方也需要做同样改动,比如item-serviceapplication.yml

language-yaml
1
2
3
4
5
6
7
8
spring:
application:
name: itemservice
datasource:
url: jdbc:mysql://172.30.3.2:3306/heima?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 1009
driver-class-name: com.mysql.cj.jdbc.Driver

观察controller

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.heima.item.web;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.pojo.PageDTO;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.stream.Collectors;

@RestController
@RequestMapping("item")
public class ItemController {


@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;

@GetMapping("list")
public PageDTO queryItemPage(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "5") Integer size){

// 分页查询商品
Page<Item> result = itemService.query()
.ne("status", 3)
.page(new Page<>(page, size));

// 查询库存
List<Item> list = result.getRecords().stream().peek(item -> {

ItemStock stock = stockService.getById(item.getId());
item.setStock(stock.getStock());
item.setSold(stock.getSold());
}).collect(Collectors.toList());

// 封装返回
return new PageDTO(result.getTotal(), list);
}

@PostMapping
public void saveItem(@RequestBody Item item){

itemService.saveItem(item);
}

@PutMapping
public void updateItem(@RequestBody Item item) {

itemService.updateById(item);
}

@PutMapping("stock")
public void updateStock(@RequestBody ItemStock itemStock){

stockService.updateById(itemStock);
}

@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id){

itemService.update().set("status", 3).eq("id", id).update();
}

@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id){

return itemService.query()
.ne("status", 3).eq("id", id)
.one();
}

@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id){

return stockService.getById(id);
}
}