Docker Compose部署Kafka集群并在宿主机Windows连接开发

Docker Compose部署Kafka集群并在宿主机Windows连接开发

Docker for Windows 4.23.0
windows 11
Java 17

1. 常用参数

kafka容器常用参数如下

  • -e KAFKA_BROKER_ID=1:设置 Kafka broker 的 ID 为 1。每个 Kafka broker 都需要一个唯一的 ID。

  • -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181:指定 Kafka 连接到 Zookeeper 的地址,这里假设 Zookeeper 容器的名称为 zookeeper,并且它在 2181 端口监听。

  • -e ALLOW_PLAINTEXT_LISTENER=yes:允许 Kafka 使用纯文本监听器。即允许非加密的通信。

  • -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:Kafka broker 实际监听在容器内的 0.0.0.0:9092 上。这意味着 Kafka 接受来自任何网络接口的连接。

  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092:指定 Kafka 广播其监听器地址,客户端将使用该地址连接到 broker。在这个例子中,Kafka 广播它在 localhost:9092 上监听。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:指定 Kafka 使用的监听器协议映射。例如:PLAINTEXT:PLAINTEXT,SSL:SSL

  • KAFKA_INTER_BROKER_LISTENER_NAME:指定 broker 间通信使用的监听器名称。例如:PLAINTEXT

2. 理解参数和原理

KAFKA_LISTENERS是broker实际监听的地址。

KAFKA_ADVERTISED_LISTENERS是broker注册在zookeeper或者controller broker里面的元数据,当消费者或者生产者使用Bootstrap-Server去连接kafka集群时,集群会返回元数据等信息到客户端,客户端会根据每个broker提供的KAFKA_ADVERTISED_LISTENERS去连接对应的broker。

所以首先,集群之间,broker之间需要通信,所以每个kafka容器需要设置一个KAFKA_ADVERTISED_LISTENERS用于告诉别的容器如何连接到自己,如果容器都是处于同一bridge网络,那么直接使用容器名即可。

其次,我们想要在宿主机比如windows的idea开发,我们一般只能通过docker容器-p暴露的端口去连接kafka,所以每个kafka容器还需要设置一个KAFKA_ADVERTISED_LISTENERS来告诉宿主机的客户端,如何连接到自己,这里需要使用localhost+暴露在宿主机的端口。

那么如果KAFKA_ADVERTISED_LISTENERS里面有2个地址,如何保证broker之间的连接使用的是容器名,而宿主机客户端使用的是localhost呢?

这需要KAFKA_INTER_BROKER_LISTENER_NAME来指定前者。

并且由于KAFKA_ADVERTISED_LISTENERS里面有2个地址,所以我们还需要KAFKA_LISTENER_SECURITY_PROTOCOL_MAP来映射监听器名字。

3. Docker Compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
version: '3.8'

services:
zookeeper:
image: bitnami/zookeeper:3.8.2
container_name: zookeeper
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka

kafka1:
image: bitnami/kafka:3.6.1
container_name: kafka1
depends_on:
- zookeeper
ports:
- "19092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka1:9093,EXTERNAL://localhost:19092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

kafka2:
image: bitnami/kafka:3.6.1
container_name: kafka2
depends_on:
- zookeeper
ports:
- "29092:9092"
environment:
- KAFKA_BROKER_ID=2
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka2:9093,EXTERNAL://localhost:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

kafka3:
image: bitnami/kafka:3.6.1
container_name: kafka3
depends_on:
- zookeeper
ports:
- "39092:9092"
environment:
- KAFKA_BROKER_ID=3
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka3:9093,EXTERNAL://localhost:39092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

networks:
kafka:
driver: bridge

可以看到每个容器都设置了INTERNAL,因为指定了KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL,所以这是用于broker之间的连接,其监听在本地的0.0.0.0:9093,广播给其它broker的通信地址是<容器名>:9093,使用PLAINTEXT(明文)方式通信。

除此之外还设置了EXTERNAL,监听在本地的0.0.0.0:9092,广播给客户端的地址是localhost:19092、localhost:29092、localhost:39092,也就是windows上的客户端通过localhost:19092访问broker,这会被docker的-p映射到对应容器的9092,被0.0.0.0:9092对接。

4. 验证

连接到某个容器。创建test主题。

1
kafka-topics.sh --create --topic test --partitions 3 --replication-factor 3 --bootstrap-server kafka1:9093

查看分区和副本情况,可以看到在不同的broker上,输出中显示的是Broker ID。

1
2
3
4
5
I have no name!@7212060b6e3d:/$ kafka-topics.sh --describe --topic test --bootstrap-server kafka1:9093
Topic: test TopicId: Lo1eQ6aCQj6WiFcNiVBrcw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

引入pom包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
<!-- SLF4J API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- Logback classic (SLF4J implementation) -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>

生产者代码,通过localhost:19092连接到集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.dragon.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;

public class KafkaProducerTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test",
"key"+i,
"我是你爹"+i
);
//发送record
producer.send(record);
Thread.sleep(500);
}

//关闭producer
producer.close();
}
}

消费者代码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.dragon.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.HashMap;

public class KafkaConsumerTest {
public static void main(String[] args) {

// 创建消费者对象
HashMap<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

config.put(ConsumerConfig.GROUP_ID_CONFIG, "mkl");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
// 消费者订阅主题
kafkaConsumer.subscribe(Collections.singletonList("test"));

try {
while (true){
// 消费者拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
records.forEach(System.out::println);
}
}finally {
// 消费者关闭
kafkaConsumer.close();
}
}
}

都启动后,消费者和生产者日志正常。

docker部署简单的Kafka

docker部署简单的Kafka

1. 拉取镜像

选择一组兼容性好的版本。

1
2
docker pull bitnami/kafka:3.6.1
docker pull bitnami/zookeeper:3.8.2

2. 运行

创建网络

首先,创建一个名为 kafka 的 Docker bridge 网络:

1
docker network create kafka

运行 ZooKeeper 容器

然后,运行 ZooKeeper 容器并将其连接到 kafka 网络:

1
docker run -d --name zookeeper --network kafka -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.8.2

运行 Kafka 容器

最后,运行 Kafka 容器并将其连接到 kafka 网络:

1
docker run -d --name kafka --network kafka -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 bitnami/kafka:3.6.1

这些命令将使 ZooKeeper 和 Kafka 容器在同一个 Docker 网络中运行,并确保它们可以相互通信。

3. 简单的校验

要判断 ZooKeeper 和 Kafka 容器是否正常运行,可以通过以下几个步骤进行检查:

1. 检查容器状态

首先,检查 ZooKeeper 和 Kafka 容器是否正在运行:

1
docker ps

输出应包含类似以下内容:

1
2
3
CONTAINER ID   IMAGE                        COMMAND                  CREATED          STATUS          PORTS                                        NAMES
<zookeeper_id> bitnami/zookeeper:3.8.2 "/opt/bitnami/script…" <some_time_ago> Up <some_time> 2181/tcp zookeeper
<kafka_id> bitnami/kafka:3.6.1 "/opt/bitnami/script…" <some_time_ago> Up <some_time> 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka

2. 检查 ZooKeeper 日志

查看 ZooKeeper 容器的日志,以确保它已成功启动并正在运行:

1
docker logs zookeeper

日志中应包含类似以下内容:

1
2
INFO  Started AdminServer on address 0.0.0.0, port 8080
INFO binding to port 0.0.0.0/0.0.0.0:2181

3. 检查 Kafka 日志

查看 Kafka 容器的日志,以确保它已成功连接到 ZooKeeper 并正在运行:

1
docker logs kafka

日志中应包含类似以下内容:

1
2
INFO  [KafkaServer id=1] started (kafka.server.KafkaServer)
INFO [ZooKeeperClient] Connected. (org.apache.zookeeper.ClientCnxn)

4. 使用 Kafka 命令行工具检查

进入 Kafka 容器内部,并使用 Kafka 命令行工具检查 Kafka 和 ZooKeeper 的状态:

1
2
3
4
docker exec -it kafka /bin/bash

# 列出 Kafka 主题
kafka-topics.sh --list --bootstrap-server kafka:9092

如果可以成功列出 Kafka 主题,则表示 Kafka 和 ZooKeeper 正常运行。

5. 创建和删除测试主题

可以尝试创建一个测试主题,并查看是否成功:

1
2
3
4
5
6
7
8
# 创建一个名为 test-topic 的主题
kafka-topics.sh --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server kafka:9092

# 列出所有主题,确认 test-topic 是否存在
kafka-topics.sh --list --bootstrap-server kafka:9092

# 删除 test-topic 主题
kafka-topics.sh --delete --topic test-topic --bootstrap-server kafka:9092

通过以上步骤,可以确认 ZooKeeper 和 Kafka 容器是否正常运行并相互通信。

Docker部署MySQL8主从模式

Docker部署MySQL8主从模式

文章目录


Mysql 8.1.0
Docker 24.0.5

关于主从模式,Mysql8.0一些版本开始,有许多变化,这里使用8.1.0

一、运行容器

新建两个MySQL文件夹,分别新建data文件夹和conf/my.cnf文件。
根据需要理解并更改以下脚本。

language-bash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/bin/bash

containerName="MyWeb02-MySQL"
MySQLData="/root/MyWeb02/MySQL/data"
MySQLConf="/root/MyWeb02/MySQL/conf/my.cnf"

containerSlaveName="MyWeb02-MySQL-Slave"
MySQLSlaveData="/root/MyWeb02/MySQL-Slave/data"
MySQLSlaveConf="/root/MyWeb02/MySQL-Slave/conf/my.cnf"

docker run -d --name "$containerName" \
-p 3307:3306 \
-v "$MySQLData":/var/lib/mysql \
-v "$MySQLConf":/etc/mysql/my.cnf \
-e MYSQL_ROOT_PASSWORD=20028888 \
mysql:8

docker run -d --name "$containerSlaveName" \
-p 3308:3306 \
-v "$MySQLSlaveData":/var/lib/mysql \
-v "$MySQLSlaveConf":/etc/mysql/my.cnf \
-e MYSQL_ROOT_PASSWORD=20028888 \
mysql:8

主节点的my.cnf容器如下

language-bash
1
2
3
[mysqld]
server-id=1
log_bin=mysql-bin

从节点的my.cnf容器如下

language-bash
1
2
[mysqld]
server-id=2

运行脚本。

二、配置主从

到主节点命令行,运行以下命令

language-sql
1
2
CREATE USER 'replica'@'%' IDENTIFIED WITH mysql_native_password BY 'replica';
GRANT REPLICATION SLAVE ON *.* TO 'replica'@'%';

到从节点命令行,运行以下命令

language-sql
1
2
3
4
5
6
7
CHANGE REPLICATION SOURCE TO 
SOURCE_HOST='172.17.0.6',
SOURCE_PORT=3306,
SOURCE_USER='replica',
SOURCE_PASSWORD='replica';
START REPLICA; //开启备份
SHOW REPLICA STATUS\G //查看主从情况

其中SOURCE_HOST为主节点容器的ip
查看主从情况时,主要注意下面两个字段是否为Yes。不是的话,就有问题,读docker logs然后去解决它。

language-bash
1
2
Slave_IO_Running: Yes
Slave_SQL_Running: Yes

Navicat等第三方软件可能不支持\G,结果以行显示。

三、测试效果

在主节点新建一个数据库

language-bash
1
create database `test`;

随后可以在从节点也看到效果。

Redis之缓存穿透问题解决方案实践SpringBoot3+Docker

Redis之缓存穿透问题解决方案实践SpringBoot3+Docker

一、介绍

当一种请求,总是能越过缓存,调用数据库,就是缓存穿透。

比如当请求一个数据库没有的数据,那么缓存也不会有,然后就一直请求,甚至高并发去请求,对数据库压力会增大。

二、方案介绍

  1. 如果key具有某种规则,那么可以对key增加校验机制,不符合直接返回。
  2. Redisson布隆过滤器
  3. 逻辑修改,当数据库没有此数据,以nullvalue,也插入redis缓存,但设置较短的过期时间。

三、Redis Docker部署

docker-compose示例如下,redis.conf从这里下载

language-yml
1
2
3
4
5
6
7
8
redis:
container_name: redis
image: redis:7.2
volumes:
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
ports:
- "6379:6379"
command: [ "redis-server", "/usr/local/etc/redis/redis.conf" ]

四、SpringBoot3 Base代码

1. 依赖配置

language-xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis 连接线程池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.24.3</version>
</dependency>
language-yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring: 
data:
redis:
host: 192.168.101.65 # Redis服务器的主机名或IP地址
port: 6379 # Redis服务器的端口号
password: # 用于连接Redis服务器的密码
database: 0 # 要连接的Redis数据库的索引号
lettuce:
pool:
max-active: 20 # 连接池中最大的活跃连接数
max-idle: 10 # 连接池中最大的空闲连接数
min-idle: 0 # 连接池中最小的空闲连接数
timeout: 10000 # 连接超时时间(毫秒)
lock-watchdog-timeout: 100 # Redisson的分布式锁的看门狗超时时间(毫秒)

2. 基本代码

要演示的代码很简单,就是一个携带courseId请求过来,调用下面的service函数,然后查询数据库。

language-java
1
2
3
4
5
@Override
public CoursePublish getCoursePublish(Long courseId) {

return coursePublishMapper.selectById(courseId);
}

当我们使用redis改造时,基本代码如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public CoursePublish getCoursePublishCache(Long courseId) {

String key = "content:course:publish:" + courseId;
//先查询redis
Object object = redisTemplate.opsForValue().get(key);
if (object != null){

String string = object.toString();
CoursePublish coursePublish = JSON.parseObject(string, CoursePublish.class);
return coursePublish;
}else {

//后查询数据库
CoursePublish coursePublish = getCoursePublish(courseId);
if (coursePublish != null){

redisTemplate.opsForValue().set(key, JSON.toJSONString(coursePublish));
}
return coursePublish;
}
}

五、缓存优化代码

1. 校验机制

我这里的id没规则,所以加不了,跳过。

2. 布隆过滤器

读取yaml配置

language-java
1
2
3
4
5
6
7
8
9
10
11
12
@Data
@Component
@ConfigurationProperties(prefix = "spring.data.redis")
public class RedisProperties {

private String host;
private int port;
private String password;
private int database;
private int lockWatchdogTimeout;
}

配置RedissonClient

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@Configuration
public class RedissionConfig {


@Autowired
private RedisProperties redisProperties;

@Bean
public RedissonClient redissonClient() {

RedissonClient redissonClient;

Config config = new Config();
//starter依赖进来的redisson要以redis://开头,其他不用
String url = "redis://"+ redisProperties.getHost() + ":" + redisProperties.getPort();
config.useSingleServer().setAddress(url)
//.setPassword(redisProperties.getPassword())
.setDatabase(redisProperties.getDatabase());

try {

redissonClient = Redisson.create(config);
return redissonClient;
} catch (Exception e) {

log.error("RedissonClient init redis url:[{}], Exception:", url, e);
return null;
}
}
}

把布隆过滤器加到service,如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private RBloomFilter<String> bloomFilter;

@PostConstruct
public void init(){

//初始化布隆过滤器
bloomFilter = redissonClient.getBloomFilter("bloom-filter");
bloomFilter.tryInit(100, 0.003);
List<CoursePublish> coursePublishList = coursePublishMapper.selectList(new LambdaQueryWrapper<CoursePublish>());
coursePublishList.forEach(coursePublish -> {

String key = "content:course:publish:" + coursePublish.getId();
bloomFilter.add(key);
});
}

@Override
public CoursePublish getCoursePublishCache(Long courseId) {

String key = "content:course:publish:" + courseId;
//布隆过滤器
boolean contains = bloomFilter.contains(key);
if (!contains){

return null;
}
//先查询redis
Object object = redisTemplate.opsForValue().get(key);
if (object != null){

String string = object.toString();
CoursePublish coursePublish = JSON.parseObject(string, CoursePublish.class);
return coursePublish;
}else {

//后查询数据库
CoursePublish coursePublish = getCoursePublish(courseId);
if (coursePublish != null){

bloomFilter.add(key);
redisTemplate.opsForValue().set(key, JSON.toJSONString(coursePublish));
}
return coursePublish;
}
}

3. 逻辑优化

当数据库没有此数据,以nullvalue,也插入redis缓存,但设置较短的过期时间。

language-java
1
2
3
4
5
6
7
8
9
10
11
//后查询数据库
CoursePublish coursePublish = getCoursePublish(courseId);
if (coursePublish != null) {

bloomFilter.add(key);
redisTemplate.opsForValue().set(key, JSON.toJSONString(coursePublish));
}else {

redisTemplate.opsForValue().set(key, JSON.toJSONString(coursePublish), 10, TimeUnit.SECONDS);
}
return coursePublish;
Docker部署xxl-job调度器并结合SpringBoot测试

Docker部署xxl-job调度器并结合SpringBoot测试

一、Docker部署

1. 创建数据库

去Github下载最新发布的源码,https://github.com/xuxueli/xxl-job/releases,找到/xxl-job/doc/db/tables_xxl_job.sql文件,对数据库进行执行即可,脚本里面包含数据库的创建。

2. 启动容器

参考官方中文文档,写出如下docker-compose示例。使用-e PARAMS: ""来指定一些变量,包括数据库信息,一般需要根据自身情况修改。

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
version: "3.8"
networks:
docker_xuecheng:
ipam:
config:
- subnet: 172.20.0.0/16

services:
xxl-job:
container_name: xxl-job
image: xuxueli/xxl-job-admin:2.4.0
volumes:
- ./xxl_job/logs:/data/applogs
ports:
- "8088:8080"
environment:
PARAMS: '
--spring.datasource.url=jdbc:mysql://172.20.0.2:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
--spring.datasource.username=root
--spring.datasource.password=1009'
networks:
docker_xuecheng:
ipv4_address: 172.20.3.1

3. 访问

访问http://192.168.101.65:8088/xxl-job-admin/即可。

4. 新建执行器

新增一个简单的testHandler执行器。

二、SpringBoot整合

1. 模块注册到执行器

在对应模块引入依赖

language-xml
1
2
3
4
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>

并指定执行器的appname

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
xxl:
job:
admin:
addresses: http://192.168.101.65:8088/xxl-job-admin
executor:
appname: testHandler
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token

2. 创建配置类

在源码中找到src/main/java/com/xxl/job/executor/core/config/XxlJobConfig.java,复制到模块代码中。如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.xuecheng.media.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {

private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {

logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}

/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/


}

3. 启动测试

重启模块,访问XXL-JOB网页端,查看情况。如果执行器的OnLine 机器地址有一个信息,表示模块绑定成功。

三、任务发布-普通任务

1. 编写任务代码

源代码中有任务代码示例,路径为src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java,仿照写一个简单的任务,如下。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class SampleXxlJob {

/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {

System.out.println("处理视频");
}
}

2. 创建任务

选择执行器,并指定JobHandler

3. 启动任务

启动刚才创建的任务

对应模块的日志可以看到每10秒打印一次输出。

XXL-JOB网页管理也可以看到相关任务执行记录。

四、任务发布-分片任务

1. 编写任务代码

language-java
1
2
3
4
5
6
7
8
9
10
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

System.out.println("分片参数:当前分片序号 = " + shardIndex + ", 总分片数 = " + shardTotal);

}

2. 启动多个实例

添加虚拟机参数-Dserver.port=63051 -Dxxl.job.executor.port=9998,前者区分程序端口,后者区分执行器端口。

3. 创建任务

创建任务之前,检查一下两个模块是否注册到指定执行器。

随后创建任务,指定执行器JobHandler,同时路由策略选择分片广播

4. 启动任务

启动任务后,观察两个模块的日志。

同时任务记录也在XXL-JOB管理网页中可以查询到。

五、动态扩容

当运行分片任务时,又添加一个新的模块示例,此时分片任务会自动扩容再分配。如图,我们再复制一个运行配置。

然后将其运行,等待一会,执行器可以看到有3个绑定的机器。

新增的运行实例日志如下,

同时,先前两个运行实例的日志发送了变化,如下

参考资料

Docker多节点部署Minio分布式文件系统并测试

Docker多节点部署Minio分布式文件系统并测试

一、前提准备

准备如下文件夹和文件

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
./
├── docker-compose-minio.yml
├── .env
├── env
│ ├── minio.env
├── minio
│ ├── minio1
│ │ ├── data1
│ │ └── data2
│ ├── minio2
│ │ ├── data1
│ │ └── data2
│ ├── minio3
│ │ ├── data1
│ │ └── data2
│ └── minio4
│ ├── data1
│ └── data2

二、文件配置

1. .env

language-env
1
MINIO_VERSION=RELEASE.2024-01-29T03-56-32Z

2. env/minio.env

language-env
1
2
MINIO_ROOT_USER=minio
MINIO_ROOT_PASSWORD=minio123

3. docker-compose-minio.yml

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
version: "3.8"
networks:
docker_xuecheng:
ipam:
config:
- subnet: 172.20.0.0/16

services:
minio1:
container_name: minio1
image: minio/minio:${
MINIO_VERSION}
volumes:
- ./minio/minio1/data1:/data1
- ./minio/minio1/data2:/data2
ports:
- "9001:9000"
- "9011:9001"
env_file:
- ./env/minio.env
command: server --address ":9000" --console-address ":9001" http://172.20.2.{
1...4}/data{
1...2}
networks:
docker_xuecheng:
ipv4_address: 172.20.2.1

minio2:
container_name: minio2
image: minio/minio:${
MINIO_VERSION}
volumes:
- ./minio/minio2/data1:/data1
- ./minio/minio2/data2:/data2
ports:
- "9002:9000"
- "9012:9001"
env_file:
- ./env/minio.env
command: server --address ":9000" --console-address ":9001" http://172.20.2.{
1...4}/data{
1...2}
networks:
docker_xuecheng:
ipv4_address: 172.20.2.2

minio3:
container_name: minio3
image: minio/minio:${
MINIO_VERSION}
volumes:
- ./minio/minio3/data1:/data1
- ./minio/minio3/data2:/data2
ports:
- "9003:9000"
- "9013:9001"
env_file:
- ./env/minio.env
command: server --address ":9000" --console-address ":9001" http://172.20.2.{
1...4}/data{
1...2}
networks:
docker_xuecheng:
ipv4_address: 172.20.2.3

minio4:
container_name: minio4
image: minio/minio:${
MINIO_VERSION}
volumes:
- ./minio/minio4/data1:/data1
- ./minio/minio4/data2:/data2
ports:
- "9004:9000"
- "9014:9001"
env_file:
- ./env/minio.env
command: server --address ":9000" --console-address ":9001" http://172.20.2.{
1...4}/data{
1...2}
networks:
docker_xuecheng:
ipv4_address: 172.20.2.4

三、测试

访问宿主机ip:9011,输入账号密码。

language-txt
1
2
MINIO_ROOT_USER=minio
MINIO_ROOT_PASSWORD=minio123

点到Monitoring -> Metrics

四、Java测试

1. 引入依赖

language-xml
1
2
3
4
5
6
7
8
9
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

2. 增删改

在这之前先去网页端,创建一个Bucket

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.xuecheng.media;

import io.minio.*;
import io.minio.errors.*;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.junit.jupiter.api.Test;

import java.io.*;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

public class MinioTest {

private MinioClient minioClient = MinioClient.builder()
.endpoint("http://192.168.101.65:9001") //改成你的宿主机ip
.credentials("minio", "minio123")
.build();

@Test
public void testCreate() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {

ObjectWriteResponse file = minioClient.uploadObject(
UploadObjectArgs.builder()
.bucket("test")
.filename("C:\\Users\\mumu\\Desktop\\1C6091EF9671978A9F1B6C6F8A3666FD.png")
.object("1.png")
.build()
);
}

@Test
public void testDelete() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {

minioClient.removeObject(
RemoveObjectArgs.builder()
.bucket("test")
.object("12.msi")
.build()
);
}

@Test
public void testGet() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {

InputStream inputStream = minioClient.getObject(
GetObjectArgs.builder()
.bucket("test")
.object("1.png")
.build()
);
FileOutputStream outputStream = new FileOutputStream(new File("C:\\Users\\mumu\\Desktop\\2.png"));
IOUtils.copy(inputStream, outputStream);
}

}

Docker运行RabbitMQ并使用SpringAMQP操作

Docker运行RabbitMQ并使用SpringAMQP操作

一、RabbitMQ运行

拉取docker镜像

language-bash
1
docker pull rabbitmq:3-management

基础运行命令

language-bash
1
2
3
4
5
6
7
8
docker run \
-e RABBITMQ_DEFAULT_USER=rabbitmq \
-e RABBITMQ_DEFAULT_PASS=rabbitmq \
--name rabbitmq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

15672是网页后台管理系统,5672是给服务用的。

官方入门教程可以看这里RabbitMQ Tutorials — RabbitMQ

二、整合SpringAMQP

1. 引入依赖

language-xml
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
language-yml
1
2
3
4
5
6
spring:
rabbitmq:
host: localhost # rabbitMQ的ip地址
port: 5672 # rabbitMQ服务端口
username: rabbitmq
password: rabbitmq

三、测试

这边采用常用的消费者-生产者模型,使用默认的Direct类型exchange。不懂的可以先继续学习rabbitmq再来实践。

1. 消费者

在消费者服务随便新建一个listener

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@Component
public class SpringRabbitListener {


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "simple.queue"),
exchange = @Exchange(name = "simple.exchange"),
key = "simple"
))
public void listenSimpleQueue(String msg) {

log.info("消费者接收到simple.queue的消息:【" + msg + "】");
}
}

2. 生产者

在生产者服务的Test模块新建一个测试

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {

String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend("simple.exchange", "simple", message);
}
}

3. 运行

先启动消费者。
登录http://localhost:15672/,可以看到simple.exchangesimple.queue已被创建。

然后启动测试testSendMessage2SimpleQueue,出现类似以下日志,消息发送成功。

language-txt
1
2
12:17:43:771  INFO 21064 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
12:17:43:808 INFO 21064 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#50f40653:0/SimpleConnection@536d97f8 [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 59641]

消费者出现类似以下日志,收到消息。

language-txt
1
2
3
12:17:31:074  INFO 8924 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a27cb34:0/SimpleConnection@671facee [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 59634]
12:17:31:141 INFO 8924 --- [ main] cn.itcast.mq.ConsumerApplication : Started ConsumerApplication in 1.011 seconds (JVM running for 1.462)
12:17:43:848 INFO 8924 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到simple.queue的消息:【hello, spring amqp!】
多级缓存架构(五)缓存同步

多级缓存架构(五)缓存同步

通过本文章,可以完成多级缓存架构中的缓存同步。

一、Canal服务

1. mysql添加canal用户

连接在上一次multiCache项目中运行的mysql容器,创建canal用户。

language-sql
1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

2. mysql配置文件

docker/mysql/conf/my.cnf添加如下配置

language-txt
1
2
3
4
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima
binlog_format=row

3. canal配置文件

添加canal服务块到docker-compose.yml,如下

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
canal:
container_name: canal
image: canal/canal-server:v1.1.7
volumes:
- ./canal/logs:/home/admin/canal-server/logs
- ./canal/conf:/home/admin/canal-server/conf
ports:
- "11111:11111"
depends_on:
- mysql
networks:
multi-cache:
ipv4_address: 172.30.3.7
language-bash
1
docker pull canal/canal-server:v1.1.7

任意启动一个canal-server容器,将里面的/home/admin/canal-server/conf文件夹复制到宿主机,对应docker/canal/conf文件夹。
删除此临时容器。

修改docker/canal/conf/canal.properties如下条目

language-dart
1
2
canal.destinations=example
canal.instance.tsdb.enable=true

修改docker/canal/conf/example/instance.properties如下条目

language-dart
1
2
3
4
5
6
7
canal.instance.master.address=172.30.3.2:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.tsdb.enable=true
canal.instance.gtidon=false
canal.instance.filter.regex=heima\\..*

二、引入依赖

pom.xml

language-xml
1
2
3
4
5
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>

application.yml

language-yml
1
2
3
canal:
destination: example
server: 172.30.3.7:11111

三、监听Canal消息

这是canal-spring-boot-starter官方仓库,含使用文档

新建canal.ItemHandler类,内容如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.heima.item.canal;

import com.github.benmanes.caffeine.cache.Cache;
import com.heima.item.config.RedisHandler;
import com.heima.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable(value = "tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {


@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, Item> itemCache;

@Override
public void insert(Item item) {

itemCache.put(item.getId(), item);
redisHandler.saveItem(item);
}

@Override
public void update(Item before, Item after) {

itemCache.put(after.getId(), after);
redisHandler.saveItem(after);
}

@Override
public void delete(Item item) {

itemCache.invalidate(item.getId());
redisHandler.deleteItemById(item.getId());
}
}

修改pojo.Item类,如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.heima.item.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;

import java.util.Date;

@Data
@TableName("tb_item")
public class Item {

@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}

四、运行

到此为止,docker-compose.yml内容应该如下

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
version: '3.8'

networks:
multi-cache:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.30.3.0/24

services:
mysql:
container_name: mysql
image: mysql:8
volumes:
- ./mysql/conf/my.cnf:/etc/mysql/conf.d/my.cnf
- ./mysql/data:/var/lib/mysql
- ./mysql/logs:/logs
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=1009
networks:
multi-cache:
ipv4_address: 172.30.3.2

nginx:
container_name: nginx
image: nginx:stable
volumes:
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./nginx/dist:/usr/share/nginx/dist
ports:
- "8080:8080"
networks:
multi-cache:
ipv4_address: 172.30.3.3

canal:
container_name: canal
image: canal/canal-server:v1.1.7
volumes:
- ./canal/logs:/home/admin/canal-server/logs
- ./canal/conf:/home/admin/canal-server/conf
ports:
- "11111:11111"
depends_on:
- mysql
networks:
multi-cache:
ipv4_address: 172.30.3.7

openresty1:
container_name: openresty1
image: openresty/openresty:1.21.4.3-3-jammy-amd64
volumes:
- ./openresty1/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
- ./openresty1/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./openresty1/lua:/usr/local/openresty/nginx/lua
- ./openresty1/lualib/common.lua:/usr/local/openresty/lualib/common.lua
networks:
multi-cache:
ipv4_address: 172.30.3.11

redis:
container_name: redis
image: redis:7.2
volumes:
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
ports:
- "6379:6379"
command: [ "redis-server", "/usr/local/etc/redis/redis.conf" ]
networks:
multi-cache:
ipv4_address: 172.30.3.21

删除原来的multiCache,重新启动各项服务。

language-bash
1
docker-compose -p multi-cache up -d

启动springboot程序。

五、测试

springboot不断输入类似如下日志,属于正常监听canal消息中。

language-txt
1
2
3
09:27:17:175  INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient  : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
09:27:18:177 INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
09:27:19:178 INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]

访问http://localhost:8081/item/10001,此时信息为tomcat查询数据库所得数据,而后存入Caffeine缓存。
访问http://localhost:8080/item.html?id=10001,此时信息为Redis缓存数据。

然后,
访问http://localhost:8081/来到商品管理页面。

修改id=10001的数据的商品分类

确认后
springboot日志出现类似如下日志

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
09:31:29:234  INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient  : 获取消息 Message[id=1,entries=[header {
version: 1
logfileName: "binlog.000007"
logfileOffset: 236
serverId: 1
serverenCode: "UTF-8"
executeTime: 1705051889000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 93
}
entryType: TRANSACTIONBEGIN
storeValue: " \r"
, header {
version: 1
logfileName: "binlog.000007"
logfileOffset: 411
serverId: 1
serverenCode: "UTF-8"
executeTime: 1705051889000
sourceType: MYSQL
schemaName: "heima"
tableName: "tb_item"
eventLength: 626
eventType: UPDATE
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\bV\020\002P\000b\332\n\n&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\00510001R\006bigint\nd\b\001\020\f\032\005title \000(\0000\000BCRIMOWA 21\345\257\270\346\211\230\350\277\220\347\256\261\346\213\211\346\235\206\347\256\261 SALSA AIR\347\263\273\345\210\227\346\236\234\347\273\277\350\211\262 820.70.36.4R\fvarchar(264)\n)\b\002\020\f\032\004name \000(\0000\000B\tSALSA AIRR\fvarchar(128)\n)\b\003\020\373\377\377\377\377\377\377\377\377\001\032\005price \000(\0000\000B\00516900R\006bigint\n\226\001\b\004\020\f\032\005image \000(\0000\000Buhttps://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webpR\fvarchar(200)\n0\b\005\020\f\032\bcategory \000(\0000\000B\f\346\213\211\346\235\206\347\256\261777R\fvarchar(200)\n\'\b\006\020\f\032\005brand \000(\0000\000B\006RIMOWAR\fvarchar(100)\nG\b\a\020\f\032\004spec \000(\0000\000B\'{\"\351\242\234\350\211\262\": \"\347\272\242\350\211\262\", \"\345\260\272\347\240\201\": \"26\345\257\270\"}R\fvarchar(200)\n\032\b\b\020\004\032\006status \000(\0000\000B\0011R\003int\n6\b\t\020]\032\vcreate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\n6\b\n\020]\032\vupdate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\022&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\00510001R\006bigint\022d\b\001\020\f\032\005title \000(\0000\000BCRIMOWA 21\345\257\270\346\211\230\350\277\220\347\256\261\346\213\211\346\235\206\347\256\261 SALSA AIR\347\263\273\345\210\227\346\236\234\347\273\277\350\211\262 820.70.36.4R\fvarchar(264)\022)\b\002\020\f\032\004name \000(\0000\000B\tSALSA AIRR\fvarchar(128)\022)\b\003\020\373\377\377\377\377\377\377\377\377\001\032\005price \000(\0000\000B\00516900R\006bigint\022\226\001\b\004\020\f\032\005image \000(\0000\000Buhttps://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webpR\fvarchar(200)\0220\b\005\020\f\032\bcategory \000(\0010\000B\f\346\213\211\346\235\206\347\256\261888R\fvarchar(200)\022\'\b\006\020\f\032\005brand \000(\0000\000B\006RIMOWAR\fvarchar(100)\022G\b\a\020\f\032\004spec \000(\0000\000B\'{\"\351\242\234\350\211\262\": \"\347\272\242\350\211\262\", \"\345\260\272\347\240\201\": \"26\345\257\270\"}R\fvarchar(200)\022\032\b\b\020\004\032\006status \000(\0000\000B\0011R\003int\0226\b\t\020]\032\vcreate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\0226\b\n\020]\032\vupdate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime"
],raw=false,rawEntries=[]]
09:31:30:572 INFO 1 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=2,entries=[header {
version: 1
logfileName: "binlog.000007"
logfileOffset: 1037
serverId: 1
serverenCode: "UTF-8"
executeTime: 1705051889000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\00287"
],raw=false,rawEntries=[]]

这里可以先用redis连接工具查询数据,发现rediis已被更新。

再次访问http://localhost:8081/item/10001直接向springbootcontroller发送请求,发现caffeine数据更新,并且springboot日志没有出现查询记录,说明走的是caffeine

多级缓存架构(三)OpenResty Lua缓存

多级缓存架构(三)OpenResty Lua缓存

通过本文章,可以完成多级缓存架构中的Lua缓存。

一、nginx服务

docker/docker-compose.yml中添加nginx服务块。

language-yaml
1
2
3
4
5
6
7
8
9
10
11
12
nginx:
container_name: nginx
image: nginx:stable
volumes:
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./nginx/dist:/usr/share/nginx/dist
ports:
- "8080:8080"
networks:
multi-cache:
ipv4_address: 172.30.3.3

删除原来docker里的multiCache项目并停止springboot应用。

nginx部分配置如下,监听端口为8080,并且将请求反向代理至172.30.3.11,下一小节,将openresty固定在172.30.3.11

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
upstream nginx-cluster {
server 172.30.3.11;
}

server {
listen 8080;
listen [::]:8080;
server_name localhost;

location /api {
proxy_pass http://nginx-cluster;
}
}

重新启动multiCache看看nginx前端网页效果。

language-css
1
docker-compose -p multi-cache up -d

访问http://localhost:8080/item.html?id=10001查询id=10001商品页

这里是假数据,前端页面会向/api/item/10001发送数据请求。

二、OpenResty服务

1. 服务块定义

docker/docker-compose.yml中添加openresty1服务块。

language-yaml
1
2
3
4
5
6
7
8
9
10
11
openresty1:
container_name: openresty1
image: openresty/openresty:1.21.4.3-3-jammy-amd64
volumes:
- ./openresty1/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
- ./openresty1/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./openresty1/lua:/usr/local/openresty/nginx/lua
- ./openresty1/lualib/common.lua:/usr/local/openresty/lualib/common.lua
networks:
multi-cache:
ipv4_address: 172.30.3.11

2. 配置修改

前端向后端发送/api/item/10001请求关于id=10001商品信息。

根据nginx的配置内容,这个请求首先被nginx拦截,反向代理到172.30.3.11 (即openresty1)。

language-txt
1
2
3
4
5
6
7
8
9
upstream nginx-cluster {
server 172.30.3.11;
}

server {
location /api {
proxy_pass http://nginx-cluster;
}
}

openresty1收到的也是/api/item/10001,同时,openresty/api/item/(\d+)请求代理到指定lua程序,在lua程序中完成数据缓存。

因此,openrestyconf/conf.d/default.conf如下

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
upstream tomcat-cluster {
hash $request_uri;
server 172.30.3.4:8081;
# server 172.30.3.5:8081;
}

server {
listen 80;
listen [::]:80;
server_name localhost;

# intercept /item and join lua
location ~ /api/item/(\d+) {
default_type application/json;
content_by_lua_file lua/item.lua;
}

# intercept lua and redirect to back-end
location /path/ {
rewrite ^/path/(.*)$ /$1 break;
proxy_pass http://tomcat-cluster;
}

error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/dist;
}
}

conf/nginx.confhttp块最后添加3行,引入依赖。

language-txt
1
2
3
4
5
6
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
#本地缓存
lua_shared_dict item_cache 150m;

3. Lua程序编写

common.lua被挂载到lualib,表示可以被其他lua当做库使用,内容如下

language-lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
-- 创建一个本地缓存对象item_cache
local item_cache = ngx.shared.item_cache;

-- 函数,向openresty本身发送类似/path/item/10001请求,根据conf配置,将被删除/path前缀并代理至tomcat程序
local function read_get(path, params)
local rsp = ngx.location.capture('/path'..path,{

method = ngx.HTTP_GET,
args = params,
})
if not rsp then
ngx.log(ngx.ERR, "http not found, path: ", path, ", args: ", params);
ngx.exit(404)
end
return rsp.body
end

-- 函数,如果本地有缓存,使用缓存,如果没有代理到tomcat然后将数据存入缓存
local function read_data(key, expire, path, params)
-- query local cache
local rsp = item_cache:get(key)
-- query tomcat
if not rsp then
ngx.log(ngx.ERR, "redis cache miss, try tomcat, key: ", key)
rsp = read_get(path, params)
end
-- write into local cache
item_cache:set(key, rsp, expire)
return rsp
end

local _M = {

read_data = read_data
}

return _M

item.lua是处理来自形如/api/item/10001请求的程序,内容如下

language-lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- include
local commonUtils = require('common')
local cjson = require("cjson")

-- get url params 10001
local id = ngx.var[1]
-- redirect item, 缓存过期时间1800s, 适合长时间不改变的数据
local itemJson = commonUtils.read_data("item:id:"..id, 1800,"/item/"..id,nil)
-- redirect item/stock, 缓存过期时间4s, 适合经常改变的数据
local stockJson = commonUtils.read_data("item:stock:id:"..id, 4 ,"/item/stock/"..id, nil)
-- json2table
local item = cjson.decode(itemJson)
local stock = cjson.decode(stockJson)
-- combine item and stock
item.stock = stock.stock
item.sold = stock.sold
-- return result
ngx.say(cjson.encode(item))

4. 总结

  1. 这里luaitem(tb_item表)和stock(tb_stock表)两个信息都有缓存,并使用cjson库将两者合并后返回到前端。
  2. 关于expire时效性的问题,如果后台改变了数据,但是openresty关于此数据的缓存未过期,前端得到的是旧数据
  3. 大致来说openresty = nginx + lua,不仅具有nginx反向代理的能力,还能介入lua程序进行扩展。

三、运行

到此为止,docker-compose.yml应该如下

language-yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
version: '3.8'

networks:
multi-cache:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.30.3.0/24

services:
mysql:
container_name: mysql
image: mysql:8
volumes:
- ./mysql/conf/my.cnf:/etc/mysql/conf.d/my.cnf
- ./mysql/data:/var/lib/mysql
- ./mysql/logs:/logs
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=1009
networks:
multi-cache:
ipv4_address: 172.30.3.2

nginx:
container_name: nginx
image: nginx:stable
volumes:
- ./nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./nginx/dist:/usr/share/nginx/dist
ports:
- "8080:8080"
networks:
multi-cache:
ipv4_address: 172.30.3.3

openresty1:
container_name: openresty1
image: openresty/openresty:1.21.4.3-3-jammy-amd64
volumes:
- ./openresty1/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
- ./openresty1/conf/conf.d/default.conf:/etc/nginx/conf.d/default.conf
- ./openresty1/lua:/usr/local/openresty/nginx/lua
- ./openresty1/lualib/common.lua:/usr/local/openresty/lualib/common.lua
networks:
multi-cache:
ipv4_address: 172.30.3.11

启动各项服务

language-bash
1
docker-compose -p multi-cache up -d

启动springboot程序。

四、测试

清空openresty容器日志。
访问http://localhost:8080/item.html?id=10001
查看openresty容器日志,可以看到两次commonUtils.read_data都没有缓存,于是代理到tomcat,可以看到springboot日志出现查询相关记录。

language-txt
1
2
3
2024-01-12 11:45:53 2024/01/12 03:45:53 [error] 7#7: *1 [lua] common.lua:99: read_data(): redis cache miss, try tomcat, key: item:id:10001, client: 172.30.3.3, server: localhost, request: "GET /api/item/10001 HTTP/1.0", host: "nginx-cluster", referrer: "http://localhost:8080/item.html?id=10001"
2024-01-12 11:45:53 2024/01/12 03:45:53 [error] 7#7: *1 [lua] common.lua:99: read_data(): redis cache miss, try tomcat, key: item:stock:id:10001 while sending to client, client: 172.30.3.3, server: localhost, request: "GET /api/item/10001 HTTP/1.0", host: "nginx-cluster", referrer: "http://localhost:8080/item.html?id=10001"
2024-01-12 11:45:53 172.30.3.3 - - [12/Jan/2024:03:45:53 +0000] "GET /api/item/10001 HTTP/1.0" 200 486 "http://localhost:8080/item.html?id=10001" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0"

再次访问此网址,强制刷新+禁用浏览器缓存+更换浏览器
间隔超过4s但小于1800s时,日志如下,只出现一次miss。

language-txt
1
2
2024-01-12 11:48:04 2024/01/12 03:48:04 [error] 7#7: *4 [lua] common.lua:99: read_data(): redis cache miss, try tomcat, key: item:stock:id:10001, client: 172.30.3.3, server: localhost, request: "GET /api/item/10001 HTTP/1.0", host: "nginx-cluster", referrer: "http://localhost:8080/item.html?id=10001"
2024-01-12 11:48:04 172.30.3.3 - - [12/Jan/2024:03:48:04 +0000] "GET /api/item/10001 HTTP/1.0" 200 486 "http://localhost:8080/item.html?id=10001" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0"

再次访问此网址,强制刷新+禁用浏览器缓存+更换浏览器
间隔小于4s,日志如下,未出现miss。

language-txt
1
2024-01-12 11:49:16 172.30.3.3 - - [12/Jan/2024:03:49:16 +0000] "GET /api/item/10001 HTTP/1.0" 200 486 "http://localhost:8080/item.html?id=10001" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0"

五、高可用集群

1. openresty

对于openresty高可用,可以部署多个openresty docker实例,并在nginxdocker/nginx/conf/conf.d/default.confupstream nginx-cluster将多个openresty地址添加进去即可。比如

language-txt
1
2
3
4
5
6
7
upstream nginx-cluster {
hash $request_uri;
# hash $request_uri consistent;
server 172.30.3.11;
server 172.30.3.12;
server 172.30.3.13;
}

多个openresty 无论是conf还是lua都保持一致即可。
并且使用hash $request_uri负载均衡作为反向代理策略,防止同一请求被多个实例缓存数据。

2. tomcat

对于springboot程序高可用,也是类似。可以部署多个springboot docker实例,并在openresty docker/openresty1/conf/conf.d/default.confupstream nginx-cluster将多个springboot地址添加进去即可。比如

language-txt
1
2
3
4
5
upstream tomcat-cluster {
hash $request_uri;
server 172.30.3.4:8081;
server 172.30.3.5:8081;
}
多级缓存架构(二)Caffeine进程缓存

多级缓存架构(二)Caffeine进程缓存

通过本文章,可以完成多级缓存架构中的进程缓存。

一、引入依赖

item-service中引入caffeine依赖

language-xml
1
2
3
4
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

二、实现进程缓存

这是Caffeine官方文档地址

1. 配置Config类

创建config.CaffeineConfig

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class CaffeineConfig {

@Bean
public Cache<Long, Item> itemCache(){

return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}

@Bean
public Cache<Long, ItemStock> stockCache(){

return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
}

2. 修改controller

ItemController中注入两个Cache对象,并修改业务逻辑

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@RestController
@RequestMapping("item")
public class ItemController {


@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
@Autowired
private Cache<Long, Item> itemCache;
@Autowired
private Cache<Long, ItemStock> stockCache;

@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id){

return itemCache.get(id, key->
itemService.query()
.ne("status", 3).eq("id", id)
.one()
);
// return itemService.query()
// .ne("status", 3).eq("id", id)
// .one();
}

@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id){

return stockCache.get(id, key->
stockService.getById(id)
);
// return stockService.getById(id);
}
}

三、运行

Idea结合Docker将springboot放入docker容器中运行,并指定使用multi-cache_multi-cache网络,以及固定172.30.3.4地址。
详细参考如下文章

启动好后,可以看到springboot容器和mysql容器处于同一网络下。(Docker Desktop for Windows插件PortNavigator)

四、测试

访问http://localhost:8081/item/10001可以看到springboot日志输出如下

language-txt
1
2
3
02:45:58:841 DEBUG 1 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne     : ==>  Preparing: SELECT id,name,title,price,image,category,brand,spec,status,create_time,update_time FROM tb_item WHERE (status <> ? AND id = ?)
02:45:58:889 DEBUG 1 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne : ==> Parameters: 3(Integer), 10001(Long)
02:45:58:951 DEBUG 1 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne : <== Total: 1

当我们二次访问此网址,强制刷新+禁用浏览器缓存+更换浏览器,springboot日志都没有新的查询记录,说明使用了Caffeine缓存。