Kafka Producer之拦截器

Kafka Producer之拦截器

1. Producer流程

新建ProducerRecord类后,传入topic、key、value等数据构建Record之后,距离发送至kafka集群还需要经历若干过程。

  1. 拦截器列表,对数据进行过滤,更改等行为,处理异常不会导致流程终止。

  2. 获取Kafka集群元数据

  3. 对数据进行序列化

  4. 根据元数据选择分区和Broker

  5. 数据校验

  6. 进入数据发送缓存区,批次发送

  7. send

2. 代码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class KafkaProducerInterceptorTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//指定拦截器
config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test1",
"key"+i,
"我是你爹"+i
);
//发送record
producer.send(record);
Thread.sleep(500);
}

//关闭producer
producer.close();
}
}

拦截器自定义类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package org.dragon.producer;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
* 自定义value拦截器试验<br/>
* 1. 实现接口
* 2. 定义泛型
* 3. 重写方法
*
*
* @author mumu
* @date 2024/07/15
*/
public class ValueInterceptorTest implements ProducerInterceptor<String, String> {

/**
* 发送数据时会调用这个方法<br/>
* 让value复制2次
*
* @param producerRecord 生产者记录
* @return {@link ProducerRecord}<{@link String}, {@link String}>
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return new ProducerRecord<String, String>(producerRecord.topic(), producerRecord.key(), producerRecord.value() + producerRecord.value());
}

/**
* 发送数据完毕,服务器返回的响应,会调用此方法。
*
* @param recordMetadata 记录元数据
* @param e e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

}

/**
* 生产者关闭,会调用此方法
*/
@Override
public void close() {

}

/**
* 创建生产者对象时调用
*
* @param map 地图
*/
@Override
public void configure(Map<String, ?> map) {

}
}

3. 测试结果

可以看到value是复制了2次,成功。

关于Kafka Topic分区和Replication分配的策略

关于Kafka Topic分区和Replication分配的策略

1. Topic多分区

如图,是一个多分区Topic在Kafka集群中可能得分配情况。

P0-RL代表分区0,Leader副本。

这个Topic是3分区2副本的配置。分区尽量均匀分在不同的Broker上,分区的Follower副本尽量不和Leader在一个Broker上。

2. 理想的策略

假设有3个Topic在含有3个Broker的Kafka集群上。

Topic1有1个分区,2个副本。

Topic2有2个分区,2个副本。

Topic3有3个分区,2个副本。

可能如下图所示。不同颜色表示不同Topic。

似乎不是特别理想,我们再调整一下,如下图

不仅每个Broker的副本数一样了,更关键的是,并且每个Broker的主Leader副本也一样的。这样更适合负载均衡。

3. 实际的策略

我们使用Kafka tool,来以此创建上述3个Topic。

首先看test1

然后看test2

然后是test3

按照上面的信息,画出来的分配结果如下图

似乎并不和我们想的一样。

查看源码,Breadcrumbskafka/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java中一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static Map<Integer, List<Integer>> assignReplicasToBrokersRackUnaware(int nPartitions,
int replicationFactor,
List<Integer> brokerList,
int fixedStartIndex,
int startPartitionId) {
Map<Integer, List<Integer>> ret = new HashMap<>();
int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size());
int currentPartitionId = Math.max(0, startPartitionId);
int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size());
for (int i = 0; i < nPartitions; i++) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0))
nextReplicaShift += 1;
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size();
List<Integer> replicaBuffer = new ArrayList<>();
replicaBuffer.add(brokerList.get(firstReplicaIndex));
for (int j = 0; j < replicationFactor - 1; j++)
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
ret.put(currentPartitionId, replicaBuffer);
currentPartitionId += 1;
}
return ret;
}

例子(来自尚硅谷)

4. 如何自定义策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class AdminTopicTest {
public static void main(String[] args) {
//定义kafka集群配置
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");

//创建Admin管理员对象
Admin admin = Admin.create(config);

//定义Topic属性
HashMap<Integer, List<Integer>> map = new HashMap<>();
// 分区0,Leader副本在3上,第二个副本在1上。
map.put(0, Arrays.asList(3, 1));
map.put(1, Arrays.asList(2, 3));
map.put(2, Arrays.asList(1, 2));
NewTopic test4 = new NewTopic("test2", map);


//创建Topic
CreateTopicsResult result = admin.createTopics(
Arrays.asList(
test4
)
);

admin.close();
}
}

不过在手动分配时,确实需要了解每个broker的负载情况,以便做出更优的分配策略。你可以使用Kafka的AdminClient类来获取集群的状态信息

Docker Compose部署Kafka集群并在宿主机Windows连接开发

Docker Compose部署Kafka集群并在宿主机Windows连接开发

Docker for Windows 4.23.0
windows 11
Java 17

1. 常用参数

kafka容器常用参数如下

  • -e KAFKA_BROKER_ID=1:设置 Kafka broker 的 ID 为 1。每个 Kafka broker 都需要一个唯一的 ID。

  • -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181:指定 Kafka 连接到 Zookeeper 的地址,这里假设 Zookeeper 容器的名称为 zookeeper,并且它在 2181 端口监听。

  • -e ALLOW_PLAINTEXT_LISTENER=yes:允许 Kafka 使用纯文本监听器。即允许非加密的通信。

  • -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:Kafka broker 实际监听在容器内的 0.0.0.0:9092 上。这意味着 Kafka 接受来自任何网络接口的连接。

  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092:指定 Kafka 广播其监听器地址,客户端将使用该地址连接到 broker。在这个例子中,Kafka 广播它在 localhost:9092 上监听。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:指定 Kafka 使用的监听器协议映射。例如:PLAINTEXT:PLAINTEXT,SSL:SSL

  • KAFKA_INTER_BROKER_LISTENER_NAME:指定 broker 间通信使用的监听器名称。例如:PLAINTEXT

2. 理解参数和原理

KAFKA_LISTENERS是broker实际监听的地址。

KAFKA_ADVERTISED_LISTENERS是broker注册在zookeeper或者controller broker里面的元数据,当消费者或者生产者使用Bootstrap-Server去连接kafka集群时,集群会返回元数据等信息到客户端,客户端会根据每个broker提供的KAFKA_ADVERTISED_LISTENERS去连接对应的broker。

所以首先,集群之间,broker之间需要通信,所以每个kafka容器需要设置一个KAFKA_ADVERTISED_LISTENERS用于告诉别的容器如何连接到自己,如果容器都是处于同一bridge网络,那么直接使用容器名即可。

其次,我们想要在宿主机比如windows的idea开发,我们一般只能通过docker容器-p暴露的端口去连接kafka,所以每个kafka容器还需要设置一个KAFKA_ADVERTISED_LISTENERS来告诉宿主机的客户端,如何连接到自己,这里需要使用localhost+暴露在宿主机的端口。

那么如果KAFKA_ADVERTISED_LISTENERS里面有2个地址,如何保证broker之间的连接使用的是容器名,而宿主机客户端使用的是localhost呢?

这需要KAFKA_INTER_BROKER_LISTENER_NAME来指定前者。

并且由于KAFKA_ADVERTISED_LISTENERS里面有2个地址,所以我们还需要KAFKA_LISTENER_SECURITY_PROTOCOL_MAP来映射监听器名字。

3. Docker Compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
version: '3.8'

services:
zookeeper:
image: bitnami/zookeeper:3.8.2
container_name: zookeeper
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka

kafka1:
image: bitnami/kafka:3.6.1
container_name: kafka1
depends_on:
- zookeeper
ports:
- "19092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka1:9093,EXTERNAL://localhost:19092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

kafka2:
image: bitnami/kafka:3.6.1
container_name: kafka2
depends_on:
- zookeeper
ports:
- "29092:9092"
environment:
- KAFKA_BROKER_ID=2
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka2:9093,EXTERNAL://localhost:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

kafka3:
image: bitnami/kafka:3.6.1
container_name: kafka3
depends_on:
- zookeeper
ports:
- "39092:9092"
environment:
- KAFKA_BROKER_ID=3
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka3:9093,EXTERNAL://localhost:39092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
networks:
- kafka

networks:
kafka:
driver: bridge

可以看到每个容器都设置了INTERNAL,因为指定了KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL,所以这是用于broker之间的连接,其监听在本地的0.0.0.0:9093,广播给其它broker的通信地址是<容器名>:9093,使用PLAINTEXT(明文)方式通信。

除此之外还设置了EXTERNAL,监听在本地的0.0.0.0:9092,广播给客户端的地址是localhost:19092、localhost:29092、localhost:39092,也就是windows上的客户端通过localhost:19092访问broker,这会被docker的-p映射到对应容器的9092,被0.0.0.0:9092对接。

4. 验证

连接到某个容器。创建test主题。

1
kafka-topics.sh --create --topic test --partitions 3 --replication-factor 3 --bootstrap-server kafka1:9093

查看分区和副本情况,可以看到在不同的broker上,输出中显示的是Broker ID。

1
2
3
4
5
I have no name!@7212060b6e3d:/$ kafka-topics.sh --describe --topic test --bootstrap-server kafka1:9093
Topic: test TopicId: Lo1eQ6aCQj6WiFcNiVBrcw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

引入pom包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
<!-- SLF4J API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<!-- Logback classic (SLF4J implementation) -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>

生产者代码,通过localhost:19092连接到集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.dragon.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;

public class KafkaProducerTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test",
"key"+i,
"我是你爹"+i
);
//发送record
producer.send(record);
Thread.sleep(500);
}

//关闭producer
producer.close();
}
}

消费者代码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package org.dragon.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.HashMap;

public class KafkaConsumerTest {
public static void main(String[] args) {

// 创建消费者对象
HashMap<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

config.put(ConsumerConfig.GROUP_ID_CONFIG, "mkl");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
// 消费者订阅主题
kafkaConsumer.subscribe(Collections.singletonList("test"));

try {
while (true){
// 消费者拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
records.forEach(System.out::println);
}
}finally {
// 消费者关闭
kafkaConsumer.close();
}
}
}

都启动后,消费者和生产者日志正常。

Kafka基础组件图推演

Kafka基础组件图推演

1. Controller Broker

Kafka集群中有一个Controller Broker,负责元数据管理和协调。

Kafka使用Zookeeper作为集群元数据的存储和管理工具。Zookeeper保存了集群的状态信息,包括所有的Topic、分区、Leader和副本信息等。

当集群状态发生变化时,Controller Broker会将变更信息写入Zookeeper。

当现任Controller Broker发生故障时,Kafka集群中的其他Broker会检测到这一情况,并通过Zookeeper进行选举。

一个Broker成功竞选为新的Controller Broker后,会从Zookeeper读取最新的集群元数据。

保障机制

  • 持久化元数据:Zookeeper持久化存储集群的元数据,确保在任何时刻都可以获取到最新的状态信息。
  • 心跳机制:Broker与Zookeeper之间通过心跳机制保持连接,快速检测Controller Broker的故障。
  • 选举机制:通过Zookeeper的选举机制,能够快速选举出新的Controller Broker,并从Zookeeper同步元数据。

2. 组件架构

1. Log Manager

LogManager主要负责管理Kafka日志(log)的存储和检索。

比如:生产者将消息发送到Partition0的Leader Broker1。LogManager在Broker1上将消息写入Partition0的日志文件。

2. Replication Manager

ReplicationManager主要负责管理分区数据的复制和同步。

每个分区的Leader和Follower之间的同步是独立进行的。也就是说,每个分区都有自己的同步过程,不依赖于其他分区。

虽然每个分区的同步过程是独立的,但每个Broker会为它所管理的每个分区(无论是Leader还是Follower)启动相应的复制线程,这些线程负责处理具体的同步任务。

比如:ReplicationManager在Broker1上将新写入的消息推送到Partition0的Follower Broker2和Broker3。ReplicationManager在Broker2和Broker3上处理从Broker1接收的复制请求,将消息写入它们各自的日志文件。

3. SocketServer

SocketServer是Kafka Broker中的一个组件,负责处理网络连接和I/O操作。它负责接受来自客户端和其他Broker的连接请求,并为每个连接分配一个线程进行处理。

4. NetworkServer

NetworkServer是Kafka的网络通信框架的一个核心部分,负责管理和调度网络请求。它使用了NIO(非阻塞I/O)来处理高并发的网络连接。

5. ZKClient

与Zookeeper通信的组件。

Kafka基础框架图推演

Kafka基础框架图推演

1. 单点模型

1. 名词概念

  1. Broker:是指Kafka集群中的一个节点。一个Kafka集群由多个Broker组成,这些Broker共同协作来处理消息的存储、传输和消费。Broker管理一个或多个分区。

  2. Topic:生产者将消息发送到指定的Topic,消费者订阅Topic以获取消息。Topic本身只是一个逻辑上的分组,不具有物理存储的概念。

  3. Partition:是Topic的子集,是Kafka中实际存储和处理消息的基本单元。每个Topic可以分成多个Partition,每个Partition都是一个有序的、不可变的消息序列。

  4. Replica:分区可以存在多副本。

  5. Leader Broker:分区的多副本下,负责处理所有的读写请求的分区所处的broker。

  6. FollowerBroker:分区的多副本下,负责同步Leader的数据的分区所处的broker。

生产者把消息(record)发送到kafka,消费者通过偏移量(offset,类似数组的下标)获取数据。

同时每个分区会有自己的Log文件,kafka使用log文件来把数据保存到磁盘。

2. 分布式集群-横向扩展

1. Topic多分区

关于生产

生产者通过Bootstrap Broker连接到Kafka集群。这一步是为了建立初始连接,并获取集群的元数据。

一旦生产者获取了这些元数据,它就知道每个分区的Leader Broker是谁,从而可以将消息直接发送到正确的Leader Broker。

生产者发送消息时必须指定Topic,但是分区是可选的。

  • 不指定分区:如果生产者没有手动指定分区,Kafka会根据默认的分区策略将消息分配到分区。默认的分区策略如下:
    • 如果消息有键(Key),Kafka会根据键的哈希值来确定分区。相同的键总是被分配到同一个分区。
    • 如果消息没有键(Key),Kafka会使用轮询或随机的方式将消息分配到分区,以确保消息分布均匀。
  • 指定分区:生产者也可以在发送消息时明确指定分区。这样,消息会直接发送到指定的分区。

在Kafka中,生产者将消息发送到Broker时,Broker的第一个操作就是将消息记录到磁盘中,以确保消息的持久性和可靠性。

关于消费

Kafka中的消费者通常属于一个消费者组(Consumer Group)。每个消费者组有一个唯一的组ID。消费者组的概念用于实现消息的负载均衡和并行消费。

当多个消费者属于同一个组时,Kafka会将Topic的分区分配给组内的消费者。**每个分区只能由组内的一个消费者消费**,这样可以实现负载均衡。

  • 单个消费者订阅一个Topic

    • 如果只有一个消费者订阅了一个Topic,那么该消费者会接收到该Topic中的所有消息。
  • 多个消费者属于同一个组

    • Topic中的分区会在组内的消费者之间进行分配。每个分区只会被组内的一个消费者消费。
    • 如果消费者数量多于分区数,多余的消费者将不会分配到任何分区,处于空闲状态。这些消费者可以在有其他消费者退出时自动接管其分区,从而实现高可用性。
    • 如果消费者数量少于分区数,有些消费者会被分配多个分区。
  • 多个消费者属于不同的组

    • 每个组都会独立消费Topic中的所有消息。也就是说,消息会被广播到所有组中的消费者。

关于分区新增

Kafka会在集群中创建新的分区。这些新的分区会被分配到不同的Broker,以实现数据的均衡存储和高可用性。Kafka不会自动将现有分区的数据重新分配或均衡到新的分区。新的分区从创建时开始是空的,只有在后续生产者发送消息时,才会向这些新的分区写入数据。消费者组会感知到分区数量的变化,并触发重新平衡。

2. 分区多副本

Kafka允许每个分区有多个副本(Replica),这些副本存储在不同的Broker上。一个副本被称为Leader,负责处理所有的读写请求,其他副本为Follower,负责同步Leader的数据。

多个副本同时只有一个副本可以读写,这就是Leader副本,其他副本成为Follower副本,用作备份。

docker部署简单的Kafka

docker部署简单的Kafka

1. 拉取镜像

选择一组兼容性好的版本。

1
2
docker pull bitnami/kafka:3.6.1
docker pull bitnami/zookeeper:3.8.2

2. 运行

创建网络

首先,创建一个名为 kafka 的 Docker bridge 网络:

1
docker network create kafka

运行 ZooKeeper 容器

然后,运行 ZooKeeper 容器并将其连接到 kafka 网络:

1
docker run -d --name zookeeper --network kafka -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.8.2

运行 Kafka 容器

最后,运行 Kafka 容器并将其连接到 kafka 网络:

1
docker run -d --name kafka --network kafka -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 bitnami/kafka:3.6.1

这些命令将使 ZooKeeper 和 Kafka 容器在同一个 Docker 网络中运行,并确保它们可以相互通信。

3. 简单的校验

要判断 ZooKeeper 和 Kafka 容器是否正常运行,可以通过以下几个步骤进行检查:

1. 检查容器状态

首先,检查 ZooKeeper 和 Kafka 容器是否正在运行:

1
docker ps

输出应包含类似以下内容:

1
2
3
CONTAINER ID   IMAGE                        COMMAND                  CREATED          STATUS          PORTS                                        NAMES
<zookeeper_id> bitnami/zookeeper:3.8.2 "/opt/bitnami/script…" <some_time_ago> Up <some_time> 2181/tcp zookeeper
<kafka_id> bitnami/kafka:3.6.1 "/opt/bitnami/script…" <some_time_ago> Up <some_time> 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka

2. 检查 ZooKeeper 日志

查看 ZooKeeper 容器的日志,以确保它已成功启动并正在运行:

1
docker logs zookeeper

日志中应包含类似以下内容:

1
2
INFO  Started AdminServer on address 0.0.0.0, port 8080
INFO binding to port 0.0.0.0/0.0.0.0:2181

3. 检查 Kafka 日志

查看 Kafka 容器的日志,以确保它已成功连接到 ZooKeeper 并正在运行:

1
docker logs kafka

日志中应包含类似以下内容:

1
2
INFO  [KafkaServer id=1] started (kafka.server.KafkaServer)
INFO [ZooKeeperClient] Connected. (org.apache.zookeeper.ClientCnxn)

4. 使用 Kafka 命令行工具检查

进入 Kafka 容器内部,并使用 Kafka 命令行工具检查 Kafka 和 ZooKeeper 的状态:

1
2
3
4
docker exec -it kafka /bin/bash

# 列出 Kafka 主题
kafka-topics.sh --list --bootstrap-server kafka:9092

如果可以成功列出 Kafka 主题,则表示 Kafka 和 ZooKeeper 正常运行。

5. 创建和删除测试主题

可以尝试创建一个测试主题,并查看是否成功:

1
2
3
4
5
6
7
8
# 创建一个名为 test-topic 的主题
kafka-topics.sh --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server kafka:9092

# 列出所有主题,确认 test-topic 是否存在
kafka-topics.sh --list --bootstrap-server kafka:9092

# 删除 test-topic 主题
kafka-topics.sh --delete --topic test-topic --bootstrap-server kafka:9092

通过以上步骤,可以确认 ZooKeeper 和 Kafka 容器是否正常运行并相互通信。