Kafka Producer之拦截器

Kafka Producer之拦截器

1. Producer流程

新建ProducerRecord类后,传入topic、key、value等数据构建Record之后,距离发送至kafka集群还需要经历若干过程。

  1. 拦截器列表,对数据进行过滤,更改等行为,处理异常不会导致流程终止。

  2. 获取Kafka集群元数据

  3. 对数据进行序列化

  4. 根据元数据选择分区和Broker

  5. 数据校验

  6. 进入数据发送缓存区,批次发送

  7. send

2. 代码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class KafkaProducerInterceptorTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//指定拦截器
config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test1",
"key"+i,
"我是你爹"+i
);
//发送record
producer.send(record);
Thread.sleep(500);
}

//关闭producer
producer.close();
}
}

拦截器自定义类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package org.dragon.producer;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
* 自定义value拦截器试验<br/>
* 1. 实现接口
* 2. 定义泛型
* 3. 重写方法
*
*
* @author mumu
* @date 2024/07/15
*/
public class ValueInterceptorTest implements ProducerInterceptor<String, String> {

/**
* 发送数据时会调用这个方法<br/>
* 让value复制2次
*
* @param producerRecord 生产者记录
* @return {@link ProducerRecord}<{@link String}, {@link String}>
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return new ProducerRecord<String, String>(producerRecord.topic(), producerRecord.key(), producerRecord.value() + producerRecord.value());
}

/**
* 发送数据完毕,服务器返回的响应,会调用此方法。
*
* @param recordMetadata 记录元数据
* @param e e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

}

/**
* 生产者关闭,会调用此方法
*/
@Override
public void close() {

}

/**
* 创建生产者对象时调用
*
* @param map 地图
*/
@Override
public void configure(Map<String, ?> map) {

}
}

3. 测试结果

可以看到value是复制了2次,成功。

关于Kafka Topic分区和Replication分配的策略

关于Kafka Topic分区和Replication分配的策略

1. Topic多分区

如图,是一个多分区Topic在Kafka集群中可能得分配情况。

P0-RL代表分区0,Leader副本。

这个Topic是3分区2副本的配置。分区尽量均匀分在不同的Broker上,分区的Follower副本尽量不和Leader在一个Broker上。

2. 理想的策略

假设有3个Topic在含有3个Broker的Kafka集群上。

Topic1有1个分区,2个副本。

Topic2有2个分区,2个副本。

Topic3有3个分区,2个副本。

可能如下图所示。不同颜色表示不同Topic。

似乎不是特别理想,我们再调整一下,如下图

不仅每个Broker的副本数一样了,更关键的是,并且每个Broker的主Leader副本也一样的。这样更适合负载均衡。

3. 实际的策略

我们使用Kafka tool,来以此创建上述3个Topic。

首先看test1

然后看test2

然后是test3

按照上面的信息,画出来的分配结果如下图

似乎并不和我们想的一样。

查看源码,Breadcrumbskafka/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java中一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static Map<Integer, List<Integer>> assignReplicasToBrokersRackUnaware(int nPartitions,
int replicationFactor,
List<Integer> brokerList,
int fixedStartIndex,
int startPartitionId) {
Map<Integer, List<Integer>> ret = new HashMap<>();
int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size());
int currentPartitionId = Math.max(0, startPartitionId);
int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size());
for (int i = 0; i < nPartitions; i++) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0))
nextReplicaShift += 1;
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size();
List<Integer> replicaBuffer = new ArrayList<>();
replicaBuffer.add(brokerList.get(firstReplicaIndex));
for (int j = 0; j < replicationFactor - 1; j++)
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
ret.put(currentPartitionId, replicaBuffer);
currentPartitionId += 1;
}
return ret;
}

例子(来自尚硅谷)

4. 如何自定义策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class AdminTopicTest {
public static void main(String[] args) {
//定义kafka集群配置
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");

//创建Admin管理员对象
Admin admin = Admin.create(config);

//定义Topic属性
HashMap<Integer, List<Integer>> map = new HashMap<>();
// 分区0,Leader副本在3上,第二个副本在1上。
map.put(0, Arrays.asList(3, 1));
map.put(1, Arrays.asList(2, 3));
map.put(2, Arrays.asList(1, 2));
NewTopic test4 = new NewTopic("test2", map);


//创建Topic
CreateTopicsResult result = admin.createTopics(
Arrays.asList(
test4
)
);

admin.close();
}
}

不过在手动分配时,确实需要了解每个broker的负载情况,以便做出更优的分配策略。你可以使用Kafka的AdminClient类来获取集群的状态信息

Docker Compose部署Kafka集群并在宿主机Windows连接开发

Docker Compose部署Kafka集群并在宿主机Windows连接开发

Docker for Windows 4.23.0
windows 11
Java 17

1. 常用参数

kafka容器常用参数如下

  • -e KAFKA_BROKER_ID=1:设置 Kafka broker 的 ID 为 1。每个 Kafka broker 都需要一个唯一的 ID。

  • -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181:指定 Kafka 连接到 Zookeeper 的地址,这里假设 Zookeeper 容器的名称为 zookeeper,并且它在 2181 端口监听。

  • -e ALLOW_PLAINTEXT_LISTENER=yes:允许 Kafka 使用纯文本监听器。即允许非加密的通信。

  • -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:Kafka broker 实际监听在容器内的 0.0.0.0:9092 上。这意味着 Kafka 接受来自任何网络接口的连接。

  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092:指定 Kafka 广播其监听器地址,客户端将使用该地址连接到 broker。在这个例子中,Kafka 广播它在 localhost:9092 上监听。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:指定 Kafka 使用的监听器协议映射。例如:PLAINTEXT:PLAINTEXT,SSL:SSL

  • KAFKA_INTER_BROKER_LISTENER_NAME:指定 broker 间通信使用的监听器名称。例如:PLAINTEXT

2. 理解参数和原理

KAFKA_LISTENERS是broker实际监听的地址。

KAFKA_ADVERTISED_LISTENERS是broker注册在zookeeper或者controller broker里面的元数据,当消费者或者生产者使用Bootstrap-Server去连接kafka集群时,集群会返回元数据等信息到客户端,客户端会根据每个broker提供的KAFKA_ADVERTISED_LISTENERS去连接对应的broker。

所以首先,集群之间,broker之间需要通信,所以每个kafka容器需要设置一个KAFKA_ADVERTISED_LISTENERS用于告诉别的容器如何连接到自己,如果容器都是处于同一bridge网络,那么直接使用容器名即可。

其次,我们想要在宿主机比如windows的idea开发,我们一般只能通过docker容器-p暴露的端口去连接kafka,所以每个kafka容器还需要设置一个KAFKA_ADVERTISED_LISTENERS来告诉宿主机的客户端,如何连接到自己,这里需要使用localhost+暴露在宿主机的端口。

那么如果KAFKA_ADVERTISED_LISTENERS里面有2个地址,如何保证broker之间的连接使用的是容器名,而宿主机客户端使用的是localhost呢?

这需要KAFKA_INTER_BROKER_LISTENER_NAME来指定前者。

并且由于KAFKA_ADVERTISED_LISTENERS里面有2个地址,所以我们还需要KAFKA_LISTENER_SECURITY_PROTOCOL_MAP来映射监听器名字。

3. Docker Compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
version: '3.8'

services:
zookeeper:
image: bitnami/zookeeper:3.8.2
container_name: zookeeper
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka

kafka1:
image: bitnami/kafka:3.6.1
container_name: kafka1
depends_on:
- zookeeper
ports:
- "19092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka1:9093,EXTERNAL://localhost:19092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

kafka2:
image: bitnami/kafka:3.6.1
container_name: kafka2
depends_on:
- zookeeper
ports:
- "29092:9092"
environment:
- KAFKA_BROKER_ID=2
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka2:9093,EXTERNAL://localhost:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

kafka3:
image: bitnami/kafka:3.6.1
container_name: kafka3
depends_on:
- zookeeper
ports:
- "39092:9092"
environment:
- KAFKA_BROKER_ID=3
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka3:9093,EXTERNAL://localhost:39092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

networks:
kafka:
driver: bridge

可以看到每个容器都设置了INTERNAL,因为指定了KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL,所以这是用于broker之间的连接,其监听在本地的0.0.0.0:9093,广播给其它broker的通信地址是<容器名>:9093,使用PLAINTEXT(明文)方式通信。

除此之外还设置了EXTERNAL,监听在本地的0.0.0.0:9092,广播给客户端的地址是localhost:19092、localhost:29092、localhost:39092,也就是windows上的客户端通过localhost:19092访问broker,这会被docker的-p映射到对应容器的9092,被0.0.0.0:9092对接。

4. 验证

连接到某个容器。创建test主题。

1
kafka-topics.sh --create --topic test --partitions 3 --replication-factor 3 --bootstrap-server kafka1:9093

查看分区和副本情况,可以看到在不同的broker上,输出中显示的是Broker ID。

1
2
3
4
5
I have no name!@7212060b6e3d:/$ kafka-topics.sh --describe --topic test --bootstrap-server kafka1:9093
Topic: test TopicId: Lo1eQ6aCQj6WiFcNiVBrcw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

引入pom包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
<!-- SLF4J API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- Logback classic (SLF4J implementation) -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>

生产者代码,通过localhost:19092连接到集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.dragon.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;

public class KafkaProducerTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test",
"key"+i,
"我是你爹"+i
);
//发送record
producer.send(record);
Thread.sleep(500);
}

//关闭producer
producer.close();
}
}

消费者代码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.dragon.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.HashMap;

public class KafkaConsumerTest {
public static void main(String[] args) {

// 创建消费者对象
HashMap<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

config.put(ConsumerConfig.GROUP_ID_CONFIG, "mkl");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
// 消费者订阅主题
kafkaConsumer.subscribe(Collections.singletonList("test"));

try {
while (true){
// 消费者拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
records.forEach(System.out::println);
}
}finally {
// 消费者关闭
kafkaConsumer.close();
}
}
}

都启动后,消费者和生产者日志正常。

Kafka基础组件图推演

Kafka基础组件图推演

1. Controller Broker

Kafka集群中有一个Controller Broker,负责元数据管理和协调。

Kafka使用Zookeeper作为集群元数据的存储和管理工具。Zookeeper保存了集群的状态信息,包括所有的Topic、分区、Leader和副本信息等。

当集群状态发生变化时,Controller Broker会将变更信息写入Zookeeper。

当现任Controller Broker发生故障时,Kafka集群中的其他Broker会检测到这一情况,并通过Zookeeper进行选举。

一个Broker成功竞选为新的Controller Broker后,会从Zookeeper读取最新的集群元数据。

保障机制

  • 持久化元数据:Zookeeper持久化存储集群的元数据,确保在任何时刻都可以获取到最新的状态信息。
  • 心跳机制:Broker与Zookeeper之间通过心跳机制保持连接,快速检测Controller Broker的故障。
  • 选举机制:通过Zookeeper的选举机制,能够快速选举出新的Controller Broker,并从Zookeeper同步元数据。

2. 组件架构

1. Log Manager

LogManager主要负责管理Kafka日志(log)的存储和检索。

比如:生产者将消息发送到Partition0的Leader Broker1。LogManager在Broker1上将消息写入Partition0的日志文件。

2. Replication Manager

ReplicationManager主要负责管理分区数据的复制和同步。

每个分区的Leader和Follower之间的同步是独立进行的。也就是说,每个分区都有自己的同步过程,不依赖于其他分区。

虽然每个分区的同步过程是独立的,但每个Broker会为它所管理的每个分区(无论是Leader还是Follower)启动相应的复制线程,这些线程负责处理具体的同步任务。

比如:ReplicationManager在Broker1上将新写入的消息推送到Partition0的Follower Broker2和Broker3。ReplicationManager在Broker2和Broker3上处理从Broker1接收的复制请求,将消息写入它们各自的日志文件。

3. SocketServer

SocketServer是Kafka Broker中的一个组件,负责处理网络连接和I/O操作。它负责接受来自客户端和其他Broker的连接请求,并为每个连接分配一个线程进行处理。

4. NetworkServer

NetworkServer是Kafka的网络通信框架的一个核心部分,负责管理和调度网络请求。它使用了NIO(非阻塞I/O)来处理高并发的网络连接。

5. ZKClient

与Zookeeper通信的组件。

Kafka基础框架图推演

Kafka基础框架图推演

1. 单点模型

1. 名词概念

  1. Broker:是指Kafka集群中的一个节点。一个Kafka集群由多个Broker组成,这些Broker共同协作来处理消息的存储、传输和消费。Broker管理一个或多个分区。

  2. Topic:生产者将消息发送到指定的Topic,消费者订阅Topic以获取消息。Topic本身只是一个逻辑上的分组,不具有物理存储的概念。

  3. Partition:是Topic的子集,是Kafka中实际存储和处理消息的基本单元。每个Topic可以分成多个Partition,每个Partition都是一个有序的、不可变的消息序列。

  4. Replica:分区可以存在多副本。

  5. Leader Broker:分区的多副本下,负责处理所有的读写请求的分区所处的broker。

  6. FollowerBroker:分区的多副本下,负责同步Leader的数据的分区所处的broker。

生产者把消息(record)发送到kafka,消费者通过偏移量(offset,类似数组的下标)获取数据。

同时每个分区会有自己的Log文件,kafka使用log文件来把数据保存到磁盘。

2. 分布式集群-横向扩展

1. Topic多分区

关于生产

生产者通过Bootstrap Broker连接到Kafka集群。这一步是为了建立初始连接,并获取集群的元数据。

一旦生产者获取了这些元数据,它就知道每个分区的Leader Broker是谁,从而可以将消息直接发送到正确的Leader Broker。

生产者发送消息时必须指定Topic,但是分区是可选的。

  • 不指定分区:如果生产者没有手动指定分区,Kafka会根据默认的分区策略将消息分配到分区。默认的分区策略如下:
    • 如果消息有键(Key),Kafka会根据键的哈希值来确定分区。相同的键总是被分配到同一个分区。
    • 如果消息没有键(Key),Kafka会使用轮询或随机的方式将消息分配到分区,以确保消息分布均匀。
  • 指定分区:生产者也可以在发送消息时明确指定分区。这样,消息会直接发送到指定的分区。

在Kafka中,生产者将消息发送到Broker时,Broker的第一个操作就是将消息记录到磁盘中,以确保消息的持久性和可靠性。

关于消费

Kafka中的消费者通常属于一个消费者组(Consumer Group)。每个消费者组有一个唯一的组ID。消费者组的概念用于实现消息的负载均衡和并行消费。

当多个消费者属于同一个组时,Kafka会将Topic的分区分配给组内的消费者。**每个分区只能由组内的一个消费者消费**,这样可以实现负载均衡。

  • 单个消费者订阅一个Topic

    • 如果只有一个消费者订阅了一个Topic,那么该消费者会接收到该Topic中的所有消息。
  • 多个消费者属于同一个组

    • Topic中的分区会在组内的消费者之间进行分配。每个分区只会被组内的一个消费者消费。
    • 如果消费者数量多于分区数,多余的消费者将不会分配到任何分区,处于空闲状态。这些消费者可以在有其他消费者退出时自动接管其分区,从而实现高可用性。
    • 如果消费者数量少于分区数,有些消费者会被分配多个分区。
  • 多个消费者属于不同的组

    • 每个组都会独立消费Topic中的所有消息。也就是说,消息会被广播到所有组中的消费者。

关于分区新增

Kafka会在集群中创建新的分区。这些新的分区会被分配到不同的Broker,以实现数据的均衡存储和高可用性。Kafka不会自动将现有分区的数据重新分配或均衡到新的分区。新的分区从创建时开始是空的,只有在后续生产者发送消息时,才会向这些新的分区写入数据。消费者组会感知到分区数量的变化,并触发重新平衡。

2. 分区多副本

Kafka允许每个分区有多个副本(Replica),这些副本存储在不同的Broker上。一个副本被称为Leader,负责处理所有的读写请求,其他副本为Follower,负责同步Leader的数据。

多个副本同时只有一个副本可以读写,这就是Leader副本,其他副本成为Follower副本,用作备份。

docker部署简单的Kafka

docker部署简单的Kafka

1. 拉取镜像

选择一组兼容性好的版本。

1
2
docker pull bitnami/kafka:3.6.1
docker pull bitnami/zookeeper:3.8.2

2. 运行

创建网络

首先,创建一个名为 kafka 的 Docker bridge 网络:

1
docker network create kafka

运行 ZooKeeper 容器

然后,运行 ZooKeeper 容器并将其连接到 kafka 网络:

1
docker run -d --name zookeeper --network kafka -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.8.2

运行 Kafka 容器

最后,运行 Kafka 容器并将其连接到 kafka 网络:

1
docker run -d --name kafka --network kafka -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 bitnami/kafka:3.6.1

这些命令将使 ZooKeeper 和 Kafka 容器在同一个 Docker 网络中运行,并确保它们可以相互通信。

3. 简单的校验

要判断 ZooKeeper 和 Kafka 容器是否正常运行,可以通过以下几个步骤进行检查:

1. 检查容器状态

首先,检查 ZooKeeper 和 Kafka 容器是否正在运行:

1
docker ps

输出应包含类似以下内容:

1
2
3
CONTAINER ID   IMAGE                        COMMAND                  CREATED          STATUS          PORTS                                        NAMES
<zookeeper_id> bitnami/zookeeper:3.8.2 "/opt/bitnami/script…" <some_time_ago> Up <some_time> 2181/tcp zookeeper
<kafka_id> bitnami/kafka:3.6.1 "/opt/bitnami/script…" <some_time_ago> Up <some_time> 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka

2. 检查 ZooKeeper 日志

查看 ZooKeeper 容器的日志,以确保它已成功启动并正在运行:

1
docker logs zookeeper

日志中应包含类似以下内容:

1
2
INFO  Started AdminServer on address 0.0.0.0, port 8080
INFO binding to port 0.0.0.0/0.0.0.0:2181

3. 检查 Kafka 日志

查看 Kafka 容器的日志,以确保它已成功连接到 ZooKeeper 并正在运行:

1
docker logs kafka

日志中应包含类似以下内容:

1
2
INFO  [KafkaServer id=1] started (kafka.server.KafkaServer)
INFO [ZooKeeperClient] Connected. (org.apache.zookeeper.ClientCnxn)

4. 使用 Kafka 命令行工具检查

进入 Kafka 容器内部,并使用 Kafka 命令行工具检查 Kafka 和 ZooKeeper 的状态:

1
2
3
4
docker exec -it kafka /bin/bash

# 列出 Kafka 主题
kafka-topics.sh --list --bootstrap-server kafka:9092

如果可以成功列出 Kafka 主题,则表示 Kafka 和 ZooKeeper 正常运行。

5. 创建和删除测试主题

可以尝试创建一个测试主题,并查看是否成功:

1
2
3
4
5
6
7
8
# 创建一个名为 test-topic 的主题
kafka-topics.sh --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server kafka:9092

# 列出所有主题,确认 test-topic 是否存在
kafka-topics.sh --list --bootstrap-server kafka:9092

# 删除 test-topic 主题
kafka-topics.sh --delete --topic test-topic --bootstrap-server kafka:9092

通过以上步骤,可以确认 ZooKeeper 和 Kafka 容器是否正常运行并相互通信。

基于Ruoyi-Cloud-Plus重构黑马项目-学成在线

基于Ruoyi-Cloud-Plus重构黑马项目-学成在线

一、系统介绍

毕设:基于主流微服务技术栈的在线教育系统的设计与实现

前端仓库:https://github.com/Xiamu-ssr/Dragon-Edu-Vue3
后端仓库:https://github.com/Xiamu-ssr/Dragon-Edu

感谢来自”疯狂的狮子”开源精神,RuoYi 微服务Plus版本:

文档地址: plus-doc

二、系统架构图

三、参考教程

主要以视频录制的方式展示。
包含:服务器选购,环境初始化,本地开发,部署系统,功能测试,性能测试,代码解析,架构探讨。

b站-木子dn

四、演示图例

机构端

运营端

用户端

开发端

支付宝沙箱版模拟网站在线完整支付流程(无营业无费用)内网穿透+局域网测试

支付宝沙箱版模拟网站在线完整支付流程(无营业无费用)内网穿透+局域网测试

环境如下

Version
手机 安卓
支付平台 支付宝
SpringBoot 3.2.1
alipay-sdk-java 4.38.200.ALL

一、介绍

系统处于开发阶段时,无需营业执照,无需任何费用,沙箱模拟网站在线完整支付流程。

参考资料如下:

1. 支付

有一个在线网站,可以为商品生成支付二维码,手机支付宝扫码,支付。

支付流程大体如下:

2. 支付结果

获取支付结果有两种方法

  • 一种为主动查询。在顾客支付后再查询方可得到正确的结果,然而这个时机是无法确定的。
  • 一种为被动接收。顾客支付后,支付宝服务器向微服务发送消息通知。

二、前提准备

1. 支付宝开放平台

  1. 注册支付宝开放平台
    https://openhome.alipay.com/

  2. 来到控制台下滑找到沙箱
    https://openhome.alipay.com/develop/manage
    或者点这里进入沙箱环境
    https://openhome.alipay.com/develop/sandbox/app

  3. 下载支付宝沙箱版到手机

2. 内网穿透

  1. 下载软件
    https://hsk.oray.com/download
    本文选择的是贝锐花生壳,会赠送一个域名。

  2. 添加映射

    • 映射类型:HTTPS
    • 外网端口:貌似改不了
    • 内网ip:port:order微服务的地址端口。

这样之后,谁往https://5m34y83626.vicp.fun/orders/receivenotify发送请求,就相当于往order微服务的/orders/receivenotify这个端点发送请求。

3. 局域网

参考这篇文章
同一Wifi下允许手机访问电脑(win10)

主要目的就是要知道,手机通过什么ip可以访问到电脑。本文是192.168.0.102,所以访问192.168.0.102:63030就相当于访问到了order微服务

三、order微服务

1. 依赖、配置

language-xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- 支付宝SDK -->
<dependency>
<groupId>com.alipay.sdk</groupId>
<artifactId>alipay-sdk-java</artifactId>
<version>4.38.200.ALL</version>
</dependency>
<!--生成二维码-->
<dependency>
<groupId>com.google.zxing</groupId>
<artifactId>core</artifactId>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>com.google.zxing</groupId>
<artifactId>javase</artifactId>
<version>3.3.3</version>
</dependency>
language-yml
1
2
3
4
5
6
7
8
9
10
11
12
server:
servlet:
context-path: /orders
port: 63030

pay:
#扫描二维码得到url
qrcodeurl: http://???:63030/orders/requestpay?payNo=%s
alipay:
APP_ID: ???
APP_PRIVATE_KEY: ???
ALIPAY_PUBLIC_KEY: ???

???填充分别为

  1. 在同一局域网中手机访问电脑的ip

  2. 沙箱环境->沙箱应用->应用信息->基本信息

  3. 沙箱环境->沙箱应用->应用信息->开发信息->应用私钥

  4. 沙箱环境->沙箱应用->应用信息->开发信息->支付宝公钥

2. 工具类

1. 二维码生成

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.xuecheng.orders.config;

import com.google.zxing.BarcodeFormat;
import com.google.zxing.EncodeHintType;
import com.google.zxing.client.j2se.MatrixToImageWriter;
import com.google.zxing.common.BitMatrix;
import com.google.zxing.qrcode.QRCodeWriter;
import com.google.zxing.qrcode.decoder.ErrorCorrectionLevel;
import com.xuecheng.base.utils.EncryptUtil;
import jakarta.servlet.ServletOutputStream;
import org.apache.commons.lang3.StringUtils;

import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;

/**
* @author mumu
* @version 1.0
* @description 二维码生成工具
* @date 2024/02/16 14:56
*/
public class QRCodeUtil {


/**
* 生成二维码
*
* @param content 二维码对应的URL
* @param width 二维码图片宽度
* @param height 二维码图片高度
* @return
*/
public String createQRCode(String content, int width, int height) throws IOException {


String resultImage = "";
//除了尺寸,传入内容不能为空
if (!StringUtils.isEmpty(content)) {


ServletOutputStream stream = null;
ByteArrayOutputStream os = new ByteArrayOutputStream();
//二维码参数
@SuppressWarnings("rawtypes")
HashMap<EncodeHintType,<
ruoyi-cloud-plus添加一个不要认证的公开新页面

ruoyi-cloud-plus添加一个不要认证的公开新页面

版本
RuoYiCloudPlus v2.1.2
plus-ui Vue3 ts

以新增一个公开的课程搜索页面为例。

一、前端

1. 组件创建

在view目录下创建一个页面的vue代码,比如

language-bash
1
src/views/customer/searchPage/index.vue

2. src/router/index.ts

为其编制一个路由。在constantRoutes中添加一组dict信息。比如

language-typescript
1
2
3
4
5
6
{

path: '/courseSearch',
component: () => import('@/views/customer/searchPage/index.vue'),
hidden: true
},

3. src/permission.ts

把页面加入前端的whiteList

language-typescript
1
2
3
4
5
6
const whiteList = [
'/login',
'/register',
'/social-callback',
'/courseSearch'
];

在浏览器输入http://localhost/courseSearch,至此这个页面已经不用登录就可以访问了。

二、后端

但是后端是有网关和认证模块的,虽然前端页面可以不用登陆了,但是如果这个页面还需要从后端获取数据,那么后端对应的controller也应该被open。

1. 设计思想

不同模块有不同的url前缀,比如

language-txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
routes:
# 认证中心
- id: ruoyi-auth
uri: lb://ruoyi-auth
predicates:
- Path=/auth/**
filters:
- StripPrefix=1
# 代码生成
- id: ruoyi-gen
uri: lb://ruoyi-gen
predicates:
- Path=/tool/**
filters:
- StripPrefix=1

并且每个模块都有可能需要open一些controller,不需要认证。那么我们进行统一设定,比如课程模块,url前缀为course,那么/course/open/**就都是被公开的端点。于是在gateway只需要把/*/open/**加入白名单即可。

2. ruoyi-gateway.yml

在nacos中修改gateway的配置文件,把/*/open/**加入whites。

language-yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
security:
ignore:
whites:
- /auth/code
- /auth/logout
- /auth/login
- /auth/binding/*
- /auth/social/callback
- /auth/register
- /auth/tenant/list
- /resource/sms/code
- /*/v3/api-docs
- /*/error
- /csrf
- /*/open/**

3. 开发Controller

在course模块中,新建一个CourseOpenController.java,内容示例如下

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package org.dromara.course.controller;

import org.dromara.course.domain.bo.CourseCategoryBo;
import org.dromara.course.domain.vo.CourseCategoryVo;
import org.dromara.course.service.CourseCategoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping("/open")
public class CourseOpenController {


@Autowired
CourseCategoryService categoryService;


@GetMapping("/category/list")
public List<CourseCategoryVo> list(CourseCategoryBo bo) {

return categoryService.queryList(bo);
}


}

重启网关和课程模块即可。

视频分块上传Vue3+SpringBoot3+Minio

视频分块上传Vue3+SpringBoot3+Minio

一、简化演示

分块上传、合并分块

前端将完整的视频文件分割成多份文件块,依次上传到后端,后端将其保存到文件系统。前端将文件块上传完毕后,发送合并请求,后端拿取文件块,合并后重新上传到文件系统。

断点续传

前端遍历文件块,每次上传之前,先询问文件块是否存在,只有不存在的情况下,才会上传。

秒传

前端分割视频文件前,先询问此视频是否已经存在,存在则不再上传,后端之间返回视频信息。前端看起来就像是被秒传了。

二、更详细的逻辑和细节问题

  • 视频文件和文件块都通过文件本身计算MD5值作为唯一标志
  • 文件系统使用Minio,只要提供buckerNamepath就可以操作文件
  • 后端合并文件块成功后会删除文件块,并以MD5值为id存入数据库
  • Minio存储文件块时,依据其md5值计算path,比如取前两个字符构建二级文件夹,文件名为md5值,无后缀。所以只需要提供文件块的md5值就可以操作文件块。
  • Minio存储完整视频文件时,依据其md5值计算path,同上,文件名为md5值,携带.mp4等后缀,所以只需要提供视频文件的md5值就可以操作视频文件。
  1. 首先,前端计算视频文件的MD5值,记为fileMd5,传递MD5值来询问后端此视频文件是否存在,后端查询数据库返回结果,如果存在,则前端触发”秒传”。
  2. 如果不存在,则将视频文件分割成文件块,循环上传,每次循环,首先计算文件块的md5值,传递md5值询问后端此文件块是否存在,后端根据md5判断文件块是否存在,如果存在,前端跳过此文件块上传,直接标记为上传成功,如果不存在,则上传至后端,后端将其保存到minio。这其实就是”分块上传,断点续传”。
  3. 最后所有分块文件都上传成功,前端发起合并请求,传递视频文件的md5值和所有文件块的md5值到后端,后端进行文件块合并、文件块的删除、合并文件的上传,将信息存储在mysql数据库,将执行结果告知前端。这就是”合并分块”

可能存在的隐患

一个视频文件的文件块没有全部上传完成就终止,此时文件块将一直保存在minio中,如果之后此视频再也没有发起过上传请求,那么这些文件块都是是一种垃圾。

可以写一个定时任务,遍历Minio没有后缀的文件块,判断其创建时间距离当前是否足够久,是则删除。

三、代码示例

前端代码

language-html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
<template>
<div class="p-2">
<el-button icon="Plus" plain type="primary" @click="handleAdd">新增</el-button>
<!-- 添加或修改media对话框 -->
<el-dialog v-model="dialog.visible" :title="dialog.title" append-to-body width="500px">
<el-form ref="mediaFormRef" :model="form" :rules="rules" label-width="80px">
<el-form-item label="上传视频" prop="originalName" v-show="dialog.title=='添加视频'">
<el-upload
ref="uploadRef"
:http-request="onUpload"
:before-upload="beforeUpload"
:limit="1"
action="#"
class="upload-demo"
>
<template #trigger>
<el-button type="primary">选择视频</el-button>
</template>
<template #tip>
<div class="el-upload__tip">
支持分块上传、端点续传
</div>
</template>
</el-upload>
</el-form-item>
<el-form-item v-show="percentageShow">
<el-progress :percentage="percentage" style="width: 100%"/>
</el-form-item>
</el-form>
</el-dialog>
</div>
</template>

<script lang="ts" name="Media" setup>
import type {
UploadInstance, UploadRawFile, UploadRequestOptions, UploadUserFile} from 'element-plus'
import SparkMD5 from "spark-md5";
import {
HttpStatus} from "@/enums/RespEnum";

const dialog = reactive<DialogOption>({

visible: false,
title: ''
});
//上传视频
const baseUrl = import.meta.env.VITE_APP_BASE_API;
const uploadImgUrl = ref(baseUrl + "/media/media/image"); // 上传的图片服务器地址
const uploadRef = ref<UploadInstance>()
const needUpload = ref(true)
const chunkSize = 5*1024*1024;

const percentage = ref(0)
const percentageShow = ref(false)

/** 新增按钮操作 */
const handleAdd = () => {

dialog.visible = true;
dialog.title = "添加视频";
percentageShow.value = false;
}

//获取文件的MD5
const getFileMd5 = (file:any) => {

return new Promise((resolve, reject) => {

let fileReader = new FileReader()
fileReader.onload = function (event) {

let fileMd5 = SparkMD5.ArrayBuffer.hash(event.target.result)
resolve(fileMd5)
}
fileReader.readAsArrayBuffer(file)
}
)
}

//在上传之前,使用视频md5判断视频是否已经存在
const beforeUpload = async (rawFile: UploadRawFile) => {

needUpload.value = true;
const fileMd5 = await getFileMd5(rawFile);
form.value.id = fileMd5;
const rsp = await getMedia(fileMd5);
if(!!rsp.data && rsp.data['id'] == fileMd5){

needUpload.value = false;
proxy?.$modal.msgWarning("视频文件已存在,请勿重复上传。文件名为"+rsp.data['originalName'])
}
}

//分块上传、合并分块
const onUpload = async (options: UploadRequestOptions) => {

if(!needUpload.value){

//秒传
percentageShow.value = true;
percentage.value = 100;
dialog.visible = false;
return;
}
percentageShow.value = true;
const file = options.file
const totalChunks = Math.ceil(file.size / chunkSize);
let isUploadSuccess = true;//记录分块文件是否上传成功
//合并文件参数
let mergeVo = {

"chunksMd5": [] as string[],
"videoMd5": undefined as string | undefined,
"videoName": file.name,
"videoSize": file.size,
"remark": undefined as string | undefined
}
//循环切分文件,并上传分块文件
for(let i=0; i<totalChunks; ++i){

const start = i * chunkSize;
const end = Math.min(start + chunkSize, file.size);
const chunk = file.slice(start, end);
//计算 chunk md5
const md5 = await getFileMd5(chunk);
mergeVo.chunksMd5.push(md5);
// 准备FormData
const formData = new FormData();
formData.append('file', chunk);
formData.append('filename', file.name);
formData.append('chunkIndex', i.toString());
formData.append('totalChunks', totalChunks.toString());
formData.append('md5', md5);
//上传当前分块
try {

//先判断这个分块是否已经存在
const isExistRsp = await isChunkExist({
"md5": formData.get("md5")});
const isExist = isExistRsp.data;
//不存在则上传
if (!isExist){

const rsp = await addChunk(formData);
console.log(`Chunk ${
i + 1}/${
totalChunks} uploaded`, rsp.data);
}else {

console.log(`Chunk ${
i + 1}/${
totalChunks} is exist`);
}
percentage.value = (i)*100 / totalChunks;
} catch (error) {

isUploadSuccess = false;
console.error(`Error uploading chunk ${
i + 1}`, error);
proxy?.$modal.msgError(`上传分块${
i + 1}出错`);
break;
}
}
//合并分块文件
if(isUploadSuccess){

proxy?.$modal.msgSuccess("分块文件上传成功")
mergeVo.videoMd5 = form.value.id;//beforeUpload已经计算过视频文件的md5
//合并文件
const rsp = await mergeChunks(mergeVo);
if (rsp.code == HttpStatus.SUCCESS){

//合并文件后,实际上媒资已经插入数据库。
percentage.value = 100;
proxy?.$modal.msgSuccess("文件合并成功")
proxy?.$modal.msgSuccess("视频上传成功")
}else{

proxy?.$modal.msgSuccess("文件合并异常")
}
}else {

proxy?.$modal.msgSuccess("文件未上传成功,请重试或联系管理员")
}
}

</script>
language-javascript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
export const getMedia = (id: string | number): AxiosPromise<MediaVO> => {

return request({

url: '/media/media/' + id,
method: 'get'
});
};

/**
* 分块文件是否存在
* */
export const isChunkExist = (data: any) => {

return request({

url: '/media/media/video/chunk',
method: 'get',
params: data
});
};

/**
* 上传分块文件
* */
export const addChunk = (data: any) => {

return request({

url: '/media/media/video/chunk',
method: 'post',
data: data
});
};

/**
* 合并分块文件
* */
export const mergeChunks = (data: any) => {

return request({

url: '/media/media/video/chunk/merge',
method: 'post',
data: data
});
};

后端代码

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@RestController
@RequestMapping("/media")
public class MediaFilesController extends BaseController {

/**
* 获取media详细信息
*
* @param id 主键
*/
@GetMapping("/{id}")
public R<MediaFilesVo> getInfo(@NotNull(message = "主键不能为空")
@PathVariable String id) {

return R.ok(mediaFilesService.queryById(id));
}

@Log(title = "视频分块文件上传")
@PostMapping(value = "/video/chunk")
public R<String> handleChunkUpload(
@RequestParam("file") MultipartFile file,
@RequestParam("md5") String md5,
@RequestParam("filename") String filename,
@RequestParam("chunkIndex") int chunkIndex,
@RequestParam("totalChunks") int totalChunks) {

if (ObjectUtil.isNull(file)) {

return R.fail("上传文件不能为空");
}
Boolean b = mediaFilesService.handleChunkUpload(file, md5);
if (b){

return R.ok();
}else {

return R.fail();
}
}

@Log(title = "分块文件是否已经存在")
@GetMapping(value = "/video/chunk")
public R<Boolean> isChunkExist(@RequestParam("md5") String md5) {

return R.ok(mediaFilesService.isChunkExist(md5));
}

@Log(title = "合并视频文件")
@PostMapping(value = "/video/chunk/merge")
public R<Boolean> mergeChunks(@RequestBody MediaVideoMergeBo bo) {

bo.setCompanyId(LoginHelper.getDeptId());
Boolean b = mediaFilesService.mergeChunks(bo);
if (b){

return R.ok();
}else {

return R.fail();
}
}
}

关于如何操作Minio等文件系统,不详细写明解释。只需要知道,给Minio提供文件本身、bucketName、path即可完成上传、下载、删除等操作。具体代码不同的包都不一样。

language-java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@Service
public class MediaFilesServiceImpl implements MediaFilesService {

@Autowired
private MediaFilesMapper mediaFilesMapper;

/**
* 分块文件上传
* <br/>
* 分块文件不存放mysql信息,同时文件名不含后缀,只有md5
* @param file 文件
* @param md5 md5
* @return {@link Boolean}
*/
@Override
public Boolean handleChunkUpload(MultipartFile file, String md5) {

//只上传至minio
OssClient storage = OssFactory.instance();
String path = getPathByMD5(md5, "");
try {

storage.upload(file.getInputStream(), path, file.getContentType(), minioProperties.getVideoBucket());
} catch (IOException e) {

throw new RuntimeException(e);
}
return true;
}

@Override
public Boolean isChunkExist(String md5) {

OssClient storage = OssFactory.instance();
String path = getPathByMD5(md5, "");
return storage.doesFileExist(minioProperties.getVideoBucket(), path);
}

@Override
public Boolean mergeChunks(MediaVideoMergeBo bo) {

OssClient storage = OssFactory.instance();
String originalfileName = bo.getVideoName();
String suffix = StringUtils.substring(originalfileName, originalfileName.lastIndexOf("."), originalfileName.length());
//创建临时文件,用来存放合并文件
String tmpDir = System.getProperty("java.io.tmpdir");
String tmpFileName = UUID.randomUUID().toString() + ".tmp";
File tmpFile = new File(tmpDir, tmpFileName);

try(
FileOutputStream fOut = new FileOutputStream(tmpFile);
) {

//将分块文件以流的形式copy到临时文件
List<String> chunksMd5 = bo.getChunksMd5();
chunksMd5.forEach(chunkMd5 -> {

String chunkPath = getPathByMD5(chunkMd5, "");
InputStream chunkIn = storage.getObjectContent(minioProperties.getVideoBucket(), chunkPath);
IoUtil.copy(chunkIn, fOut);
});
//合并文件上传到minio
String videoMd5 = bo.getVideoMd5();
String path = getPathByMD5(videoMd5, suffix);
storage.upload(tmpFile, path, minioProperties.getVideoBucket());
//删除分块文件
chunksMd5.forEach(chunkMd5->{

String chunkPath = getPathByMD5(chunkMd5, "");
storage.delete(chunkPath, minioProperties.getVideoBucket());
});
} catch (Exception e) {

throw new RuntimeException(e);
}finally {

if (tmpFile.exists()){

tmpFile.delete();
}
}
//上传信息到mysql
MediaFiles mediaFiles = new MediaFiles();
mediaFiles.setId(bo.getVideoMd5());
mediaFiles.setCompanyId(bo.getCompanyId());
mediaFiles.setOriginalName(originalfileName);
mediaFiles.setFileSuffix(suffix);
mediaFiles.setSize(bo.getVideoSize());
mediaFiles.setPath(getPathByMD5(bo.getVideoMd5(), suffix));
mediaFiles.setRemark(bo.getRemark());
mediaFiles.setAuditStatus(MediaStatusEnum.UNREVIEWED.getValue());
return mediaFilesMapper.insert(mediaFiles) > 0;
}

/**
* 通过md5生成文件路径
* <br/>
* 比如
* md5 = 6c4acb01320a21ccdbec089f6a9b7ca3
* <br/>
* path = 6/c/md5 + suffix
* @param prefix 前缀
* @param suffix 后缀
* @return {@link String}
*/
public String getPathByMD5(String md5, String suffix) {

// 文件路径
String path = md5.charAt(0) + "/" + md5.charAt(1) + "/" + md5;
return path + suffix;
}

}