Kafka 更多延伸讨论

Kafka 更多延伸讨论

1. 脑裂

因为网络问题,导致kafka出现多个controller,那么剩余的broker应该认为谁是真正的controller呢?

在zookeeper中引入controllor_epoch,表示纪元,每次controller变化,都会更新纪元值,这样其余的broker就能判断出来自controller的消息哪些是真controller,哪些是可以忽略的。

2. 零拷贝

如下图是一种消息数据传输过程,需要把数据从磁盘中读取到页缓存,然后转化成用户态,再转移数据,再转化成内核态,在把数据传输,整个流程的效率是非常低的。

优化之后,利用系统接口调用,让数据流转不需要再内核和用户之间转化,并且流程更短,复制次数更少,效率更高。

零拷贝不是数据不经过拷贝,而是对于用户空间的Kafka来说,不需要拷贝。

3. 顺写日志

顺写日志是一种数据结构,其中所有的写操作都是追加(append)到日志的末尾,而不是在日志中间进行随机写入。Kafka 的日志文件采用顺写日志结构,消息被顺序地追加到日志文件的末尾,每条消息都有一个唯一的偏移量(offset)用于标识其在日志中的位置。

当生产者发送消息给 Kafka broker 时,这些消息会被顺序地追加到对应分区的日志文件末尾。顺序写入意味着磁盘的写操作可以以很高的速度完成,因为硬盘(尤其是传统的磁盘驱动器)对于顺序写入的性能非常高,远高于随机写入。

消费者从 Kafka 读取消息时,是根据偏移量从日志中顺序读取消息的。由于日志文件是顺序写入的,消费者可以高效地顺序读取消息。消费者可以选择从特定的偏移量开始读取消息,以便实现灵活的消息处理。

为了管理日志文件的大小,Kafka 将每个分区的日志文件分为多个段(segment)。当一个段达到预设的大小或时间限制时,会关闭当前段并创建一个新的段。这种分段机制使得 Kafka 能够高效地管理日志文件,同时支持日志的删除和压缩。

Kafka 的顺写日志是其高性能和高可靠性的关键基础。通过将所有写入操作顺序追加到日志末尾,Kafka 能够高效地处理大量消息,并提供持久性保证和灵活的消费模式。顺写日志的结构使得 Kafka 在分布式环境中能够简单而高效地实现复制和故障恢复,同时支持日志保留和压缩,以适应不同的业务需求。

4. Kafka简单优化

1. 操作系统

Kafka 的网络客户端底层使用Java NI0的 Selector 方式,而 Selector 在 Linux 的实现是 epoll,在 Windows 上实现机制为 select。因此 Kafka 部署在 Linux 会有更高效的I/0 性能。
数据在磁盘和网络之间进行传输时候,在 Linux 上可以享受到零拷贝机制带来的快捷和便利高效,而 Windows 在一定程度上会使用零拷贝操作。所以建议 Kafka 部署在Linux操作系统上。

2. 磁盘选择

Kafka 存储方式为顺序读写,机械硬盘的最大劣势在于随机读写慢。所以使用机械硬盘并不会造成性能低下。所以磁盘选用普通机械硬盘即可,Kafka自身已经有冗余机制,而且通过分区的设计,实现了负载均衡的功能。不做磁盘组raid阵列也是可以的。使用机械磁盘成本也低得多。

3. 网络带宽

设计场景:如果机房为干兆带宽,我们需要在一小时内处理 1TB的数据,需要多少台kafka 服务器?

由于带宽为干兆网,1000Mbps=1Gbps,则每秒钟每个服务器能收到的数据量为1)1Gb=1000Mb
假设 Kafka 占用整个服务器网络的 70%(其他 30%为别的服务预留),则 Kafka可以使用到 700Mb的带宽,但是如果从常规角度考虑,我们不能总让 Katka顶满带宽峰值,所以需要预留出 2/3 甚至 3/4的资源,也就是说,Kanka 单台服务器使用带宽实际应为 700Mb/3=240Mb

3)1小时需要处理 1TB 数据,1TB=102410248M6-8000000Mb,则一秒钟处理数据量为:8000000Mb/3600s=2330Mb数据。
需要的服务器台数为:2330Mb/240Mb~10 台。
考虑到消息的副本数如果为 2,则需要 20 台服务器,副本如果为 3,则需要 30台服务器。

4. 内存配置

Katka 运行过程中设计到的内存主要为 JVM的堆内存操作系统的页缓存,每个Broker 节点的堆内存建议 10-15G内存,而数据文件(默认为 1G)的 25%在内存就可以了综合上述,Kafka 在大数据场景下能够流畅稳定运行至少需要11G,建议安装Kafka的服务器节点的内存至少大于等于 16G。

5. CPU

在生产环境中,建议 CPU核数最少为 16 核,建议 32核以上,方可保证大数据环境中的 Katka集群正常处理与运行。

6. 集群容错

  • 副本分配策略:一般建议2个及以上副本来保证高可用。

  • 故障转移方案:Kafka某Broker故障后,会将其负责的分区副本转移到其他存活的Broker下,并自动选择新的主分区。

  • 数据备份和恢复:kafka基于日志文件的存储方式,每个Broker上都有副本数据,可以通过配置策略来优化这部分。

7. 参数优化

参数名 默认参数值 位置 优化场景 备注
num.network.threads 3 服务端 低延迟
num.io.threads 8 服务端 低延迟
socket.send.buffer.bytes 102400 (100K) 服务端 高吞吐
socket.receive.buffer.bytes 65536 (64K) 服务端 高吞吐场景
max.in.flight.requests.per.connection 5 生产端 并等
buffer.memory 33554432 (32M) 生产端 高吞吐
batch.size 16384 (16K) 生产端 提高性能
linger.ms 0 生产端 提高性能
fetch.min.bytes 1 消费端 提高性能 网络交互次数
max.poll.records 500 消费端 批量处理 控制批量获取消息数量
fetch.max.bytes 57671680 (55M) 消费端 批量处理 控制批量获取消息字节大小

8. 压缩算法

压缩算法 压缩比率 压缩效率 解压缩效率
snappy 2.073 580m/s 2020m/s
lz4 2.101 800m/s 4220m/s
zstd 2.884 520m/s 1600m/s

Kafka Consumer

Kafka Consumer

1. Consumer基本流程

  1. 初始化消费者

首先,创建一个 Kafka 消费者实例,并设置必要的配置属性,例如 Kafka 集群地址、消费者组 ID、序列化器等。

1
2
3
4
5
6
7
8
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("isolation.level", "read_committed"); // 可选,确保事务性消息的一致性

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  1. 订阅主题

创建消费者实例后,订阅一个或多个主题,以便消费者能够从这些主题中获取消息。

1
consumer.subscribe(Arrays.asList("my-topic"));
  1. 轮询消息

消费者使用 poll 方法从 Kafka 中拉取消息。poll 方法会阻塞指定的时间,并返回拉取到的消息记录。

1
2
3
4
5
6
7
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
  1. 处理消息

在轮询到消息后,需要对消息进行处理。处理逻辑根据具体业务需求来实现。

1
2
3
4
for (ConsumerRecord<String, String> record : records) {
// 处理消息的业务逻辑
process(record);
}
  1. 提交偏移量

为了保证消息不会被重复处理或丢失,消费者需要提交已经处理过的消息偏移量。可以选择自动提交或手动提交偏移量。

  • 自动提交
    自动提交偏移量由 enable.auto.commit 属性控制。如果设置为 true,消费者会定期自动提交偏移量。

    1
    2
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
  • 手动提交
    手动提交提供了更细粒度的控制。可以在处理完一批消息后手动提交偏移量。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    try {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    // 处理消息
    process(record);
    }
    consumer.commitSync(); // 同步提交偏移量
    // 或者使用异步提交
    // consumer.commitAsync();
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    consumer.close();
    }
  1. 关闭消费者

在应用程序关闭时,确保关闭消费者以释放资源。close 方法会确保任何未提交的偏移量被提交,并关闭网络连接。

1
consumer.close();

以下是一个完整的消费者示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("isolation.level", "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}

2. 数据消费偏移量问题

consumer默认会定时把当前消费到哪里的偏移量更新到Kafka 的内部主题 __consumer_offsets 中。这是一个特殊的主题,用于存储消费者组的偏移量信息。但是这会存在一些问题。比如

  1. 默认每5000ms(由auto.commit.interval.ms决定)自动更新一次偏移量,当consumer这次拉取了100条数据,我消费到了第50条时consumer崩溃了,此时还未更新偏移量,下一次consumer启动后,会重复消费那50条数据。

  2. 即使换成手动提交,也存在同样的问题,消费到一半,consumer崩溃了,还没走到手动提交那一步,kafka记录的偏移量是过时的,所有下一次consumer重启,一样会重复消费。

3. 事务级别的问题

即使生产者使用了事务,却仍然可以导致consumer消费到不该消费的数据。

Kafka 的事务机制

  1. 事务性生产者

    • 当一个事务性生产者开始一个事务时,它会生成一个唯一的 transactional.id,并通过 beginTransaction() 开始事务。
    • 所有在这个事务中的消息会被标记为事务性的,并且这些消息的状态(已提交或已中止)会被记录在事务日志中。
    • 当事务成功完成时,调用 commitTransaction(),所有的消息都会被原子地标记为已提交。
    • 如果事务失败,调用 abortTransaction(),所有在这个事务中的消息都会被标记为已中止,并不会对外可见。
  2. 事务协调器

    • Kafka 的事务协调器负责管理事务的状态。
    • 它确保所有涉及的分区的一致性,并在事务提交或中止时更新事务日志。

消费者的隔离级别

Kafka 消费者有两种隔离级别:

  1. read_uncommitted(默认值):

    • 在这种隔离级别下,消费者可以读取到所有的消息,包括未提交的事务性消息。
    • 这种隔离级别可能会导致消费者读取到事务中止后的消息,从而出现消费者消费到未提交或中止事务的数据的情况。
  2. read_committed

    • 在这种隔离级别下,消费者只能读取到已提交的事务性消息。
    • 消费者会过滤掉未提交和已中止的事务消息,确保只消费到已经成功提交的消息。
    • 使用这种隔离级别可以确保端到端的精确一次语义。
1
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

如果消费者的隔离级别设置为 read_uncommitted,那么它会读取到所有消息,包括未提交的事务性消息。因此,即使事务中止,消费者也可能已经读取到这些消息。

4. 消费者组

一个消费者组只能订阅多个主题,组内的消费者共同消费这些主题。

一个分区只能由组内的一个消费者消费

如果消费者组的消费者数量大于topic的分区数,那么有的consumer将会空闲,用作备用。

如果小于topic分区数,那么有的消费者会消费多个分区。

当消费者组的消费者数量大于topic的分区时,此时有消费者崩溃,那么空闲的消费者会代替它,那么如何从刚才消费的位置继续消费呢?

  • 当消费者处理完一批消息并提交偏移量时,会将偏移量信息写入到 __consumer_offsets 主题中。
  • 记录的键包括消费者组 ID、主题名称和分区号,值是该分区的最新偏移量。
  • 在消费者组重新平衡后,新分配到分区的消费者会从 __consumer_offsets 主题中读取对应的偏移量信息。
  • 由于使用了消费者组 ID、主题名称和分区号的组合键,新的消费者能够准确读取到该分区的最新提交偏移量。

这些过程对于消费者都是透明的,并不需要代码手动操作。

分区分配策略

消费者组的平衡过程中,分区和消费者没有强绑定关系,意味着某个分区之前由消费者A消费,也许之后会由消费者B消费。

至于如何分配,是由Leader决定的,第一个加入组的消费者成为Leader(群主)。

在 Apache Kafka 中,分区具体分配给组内消费者的过程是由消费者组协调器(称为群主)决定的。Kafka 提供了多种分区分配策略,以便更好地适应不同的使用场景和需求。主要的分配策略包括:

  1. RangeAssignor(范围分配)

    • 每个主题的分区列表按照范围进行划分,将连续的一组分区分配给一个消费者。
    • 如果有多个主题,每个主题独立进行分区分配。
    • 比如5分区3消费者,就是[1,2], [3,4], [5]
  2. RoundRobinAssignor(轮询)

    • 将所有订阅的主题的分区视为一个统一的分区列表,按照轮询的方式将分区依次分配给消费者。
    • 这种策略通常能够实现较为均匀的分区分配,无论分区数和消费者数是否均匀。
  3. StickyAssignor(粘性)

    • 第一次分配后,组成员保留分配给自己的分区信息
    • 尝试在每次再平衡时保持分区分配的稳定性,尽量减少分区的重新分配。
    • 有助于减少消费者的重新分配和重启开销。
  4. CooperativeStickyAssignor(优化粘性)

    • 前面的基于EAGER协议,平衡时会让所有消费者放弃当前分区,关闭连接,清理资源,全部统一平衡。
    • CooperativeStickyAssignor 使用COOPERATIVE协议,小规模平衡
    • 基于 StickyAssignor 进一步优化,减少在再平衡过程中的分区重新分配次
  5. Custom Assignor

    • 用户可以实现自己的分区分配策略,只需实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口,并在配置中指定自定义的分配策略。

选择合适的分区分配策略取决于具体的使用场景和需求。例如,如果需要较高的分区分配稳定性,可以选择 StickyAssignor 或 CooperativeStickyAssignor;如果需要较为均匀的分区分配,可以选择 RoundRobinAssignor。

配置分配策略时,可以在消费者配置中指定 partition.assignment.strategy 属性,例如:

1
2
3
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
//or
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

Leader选举

当一个新的消费者加入现有的消费者组时,Kafka 确实会触发消费者组的重新平衡(rebalance)。在重新平衡过程中,所有消费者会暂时断开与消费者组协调器的连接,然后重新加入消费者组。这是为了确保分区可以被均匀地重新分配给所有的消费者。

  1. 短暂中断

    • 在重新平衡过程中,消费者组内的所有消费者会有一个短暂的中断期,这段时间内不会有消息被消费。
  2. 负载均衡

    • 重新平衡确保分区能够均匀地分配给所有活跃的消费者,实现负载均衡。
  3. 领导者变化

    • 领导者消费者可能会发生变化,但这不会影响消息消费的整体流程,只是内部管理机制的一部分。

一般来说,消费者组的消费者的PARTITION_ASSIGNMENT_STRATEGY_CONFIG策略都保持一致,因此leader的更换并不会改变分区分配策略。

Kafka之存储设计

Kafka之存储设计

1. 分区和副本的存储结构

在一个多 broker 的 Kafka 集群中,topic 的分区和副本在各个 broker 上的存储文件夹分布如下:

假设有以下设置:

  • 一个 Kafka 集群包含 3 个 broker(broker 0, broker 1, broker 2)。
  • 一个 topic my-topic,有 3 个分区(partition 0, partition 1, partition 2)。
  • 每个分区有 2 个副本。

1. 分区和副本的分布

Kafka 会在多个 broker 之间分配分区和副本。假设分配如下:

  • partition 0
    • leader: broker 0
    • follower: broker 1
  • partition 1
    • leader: broker 1
    • follower: broker 2
  • partition 2
    • leader: broker 2
    • follower: broker 0

2. 存储目录结构

每个 broker 的数据目录结构如下(假设 log.dirs 配置为 /var/lib/kafka/data):

  • Broker 0 (/var/lib/kafka/data)
1
2
3
4
5
6
7
8
9
/var/lib/kafka/data
└── my-topic-0 # partition 0 leader
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000000000000.timeindex
└── my-topic-2 # partition 2 follower
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000000000000.timeindex
  • Broker 1 (/var/lib/kafka/data)
1
2
3
4
5
6
7
8
9
/var/lib/kafka/data
└── my-topic-0 # partition 0 follower
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000000000000.timeindex
└── my-topic-1 # partition 1 leader
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000000000000.timeindex
  • Broker 2 (/var/lib/kafka/data)
1
2
3
4
5
6
7
8
9
/var/lib/kafka/data
└── my-topic-1 # partition 1 follower
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000000000000.timeindex
└── my-topic-2 # partition 2 leader
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000000000000.timeindex

3. 文件描述

每个分区目录包含多个文件:

  • .log 文件:存储实际的消息数据。
  • .index 文件:存储消息偏移量索引,以便快速定位消息。
  • .timeindex 文件:存储消息时间戳索引,以便基于时间进行查找。

2. 相关配置

在 Apache Kafka 中,消息到达 leader broker 后,实际上是先写入操作系统的页缓存,然后由操作系统决定何时将数据刷入磁盘。

Kafka 允许通过配置参数来控制消息何时刷入磁盘。主要有以下几个重要的参数:

  • log.flush.interval.messages:指定在写入多少条消息后,强制将数据刷入磁盘。默认为 Long.MAX_VALUE,即不基于消息数量进行刷盘。
  • log.flush.interval.ms:指定时间间隔(以毫秒为单位),强制将数据刷入磁盘。默认为 Long.MAX_VALUE,即不基于时间进行刷盘。
  • log.flush.scheduler.interval.ms:默认值为 3000 毫秒。这只是一个检查的频率,实际的刷盘行为是由 log.flush.interval.ms 决定的。当调度器检查时,如果发现已经超过了 log.flush.interval.ms 设置的时间间隔,就会触发刷盘操作。
  • log.segment.bytes:控制单个日志段文件的最大大小,当一个日志段文件达到指定大小时,Kafka 会创建一个新的日志段文件,默认值1G。
  • log.segment.delete.delay.ms:控制日志段文件在被删除之前的延迟时间。当一个日志段文件被标记为删除后,Kafka 会等待指定的延迟时间才会真正删除该文件。这为潜在的恢复操作提供了缓冲时间。默认值60000 ms。
  • log.roll.mslog.roll.hours:控制日志段文件的滚动时间间隔,无论日志段文件的大小如何,当达到指定的时间间隔时,Kafka 会创建一个新的日志段文件。log.roll.hours默认值168 小时(7 天)。

3. 数据文件类型

  1. .index 文件

    • 描述:这是 Kafka 的偏移量索引文件。它用于快速查找消息在日志文件中的位置。
    • 命名格式00000000000000000000.index
    • 作用:通过这个索引文件,Kafka 可以快速定位消息在日志文件中的物理位置,以便更快地读取消息。
  2. .log 文件

    • 描述:这是 Kafka 的日志文件,存储实际的消息数据。
    • 命名格式00000000000000000000.log
    • 作用:包含了生产者发送的消息内容。每个日志文件是一个分区的一部分,日志文件的命名表示消息的起始偏移量。
  3. .timeindex 文件

    • 描述:这是 Kafka 的时间戳索引文件,存储消息的时间戳索引。
    • 命名格式00000000000000000000.timeindex
    • 作用:通过这个文件,Kafka 可以根据时间戳快速查找消息。这个文件对于实现基于时间的消息查找非常重要。
  4. .snapshot 文件

    • 描述:这是 Kafka 的快照文件,记录了日志段的元数据快照。
    • 命名格式00000000000000000016.snapshot
    • 作用:用于恢复日志段的元数据,保证在崩溃恢复时能够正确地重建索引和时间戳数据。
  5. leader-epoch-checkpoint 文件

    • 描述:这是 Kafka 用于记录 leader 选举周期的检查点文件。
    • 作用:记录了分区的 leader 副本在不同的选举周期中的偏移量信息,帮助 Kafka 在故障恢复时确定正确的 leader 和消息偏移量。
  6. partition.metadata 文件

    • 描述:这是 Kafka 的分区元数据文件。
    • 作用:存储分区的基本元数据信息,如分区的 leader、replica 列表等,用于分区的管理和协调。

4. 数据定位原理

log等文件直接打开会乱码,使用以下工具可以解析到控制台。

1
kafka-run-class.sh kafka.tools.DumpLogSegments --files /path/to/log-file.log --print-data-log

一个log文件里面有如下内容,

Kafka 日志文件中的内容并不是简单的按行排列的消息,而是采用了批处理(batch)的方式来存储消息。

那么.index文件中可能是如下内容:

1
offset: 3 position: 95

.index 文件并不会为每一条消息都记录映射关系,而是每隔一定的字节数(由配置 log.index.interval.bytes 决定,默认4096)记录一次。

如上图,

LogSegment 类

LogSegment 主要负责一个段的日志管理。它包括:

  • 日志文件(.log):存储实际的消息数据。
  • 偏移量索引文件(.index):存储消息偏移量到物理位置的映射。
  • 时间戳索引文件(.timeindex):存储消息时间戳到物理位置的映射。

UnifiedLog 类

UnifiedLog 管理一个分区的所有日志段。它通过跳表(ConcurrentSkipListMap)实现多个 LogSegment 日志的连续存储。UnifiedLog 的主要职责包括:

  • 消息写入:将消息追加到当前活动的 LogSegment 中。如果当前日志段已满,滚动到新的日志段。
  • 消息读取:根据偏移量或时间戳查找并读取消息,可能跨越多个日志段。
  • 日志截断:根据保留策略(如日志保留时间或大小),截断过期或不需要的日志段。
  • 数据恢复:在 broker 重启或故障恢复时,从日志段中恢复数据。

如图,要查询偏移量为7的数据:

  1. 通过跳表定位到对应的LogSegment

  2. 通过.index,经由二分法等高效定位指定偏移量的位置(如果没记录,则使用最大的小于偏移量位置)

  3. 按照指定位置快速定位到偏移量7的位置(或更前面一些)

5. 副本数据同步

follower会定时向leader拉取数据。

HW水位线

水位线(HW)是 Kafka 中每个分区的一个偏移量,它表示已经被所有同步副本(leader 和 follower)确认并复制的最高偏移量。

  • 数据一致性:HW 确保只有那些已经被所有同步副本成功复制的消息才会对消费者可见。这样可以防止数据不一致的问题,防止读取到未被完全复制的消息。

  • 数据可靠性:HW 确保了在系统发生故障时,数据不会丢失,并且消费者读取到的数据是可靠的。如果设置了 acks=all,那么只有当所有同步副本都确认收到消息后,HW 才会更新。这确保了数据已经被多个副本存储,防止数据丢失。

  • 故障恢复:当 leader 副本故障时,Kafka 会从同步副本中选举一个新的 leader 副本。新的 leader 会从 HW 位置开始,确保它拥有所有已提交的消息。

  • 提高数据处理的可靠性和简化系统设计。生产者和消费者不需要处理复杂的数据一致性逻辑,只需依赖 Kafka 的 HW 机制。消费者读取的数据都是已经被确认的可靠数据,避免处理未确认数据带来的复杂性和错误。

LEO末端偏移量

LEO(Log End Offset)是 Kafka 中的一个重要概念,代表一个分区的日志末端偏移量。具体来说,LEO 是指分区中下一条待写入消息的偏移量。

HW更新原理

Leader会记录所有副本的LEO,以及HW。

Follower会记录自己的LEO,以及HW。

  1. 消息来到Leader,Leader更新自身LEO。

  2. Follower向Leader同步数据,同步发送自身LEO,Leader更新LEO数据,并更新HW。

  3. Leader将数据返回到Follower,并携带HW,Followe同步HW的值,并更新自身LEO。

如此反复,LEO和HW就在不断地更新。

6. 数据清除

  • log.retention.hourslog.retention.minuteslog.retention.ms:日志保留的时间。超过这个时间的日志文件将被删除。log.retention.hours默认值为168(即 7 天)
  • log.retention.check.interval.ms:指定 Kafka Broker 多长时间检查一次日志文件,并根据配置的日志保留策略删除或压缩过期的日志文件。默认值:300000 毫秒(即 5 分钟).
  • log.retention.bytes:每个分区保留的最大日志大小,超过这个大小的日志将被删除。默认值:-1(表示没有大小限制)。
  • log.cleanup.policy:日志清理策略,支持 delete 和 compact 两种模式。delete 模式表示根据保留策略删除旧日志,compact 模式表示日志压缩。默认值为delete。
  • log.cleaner.min.cleanable.ratio:日志分段中可以被清理的最小比例。仅当分段中可清理的日志比例超过此值时,才会触发日志压缩。
  • log.cleaner.delete.retention.ms:被标记为删除的记录在清理前的保留时间(以毫秒为单位)。在此时间之后,记录将从日志中永久删除。

关于 log.cleanup.policy=compact,因为数据会丢失,所以这种策略只适用于保存数据最新状态的特殊场景。压缩步骤如下:

  1. 标记旧数据:
    Kafka会通过定期扫描日志分段(log segment)来查找每个key的最新值。对于同一个key,Kafka会将旧的值标记为删除(通常是通过在记录上设置一个删除标记)。

  2. 合并过程:
    Kafka在后台运行一个合并过程(compaction process),这个过程会将分段中旧的key值对删除,保留最新的key值对。合并过程是增量进行的,Kafka并不会在每次写入消息时都触发这个过程。

  3. 实际删除:
    被标记为删除的key值对并不会立即从日志分段中删除。Kafka的压缩过程是定期进行的,时间间隔和触发条件可以通过配置参数来调整。默认情况下,Kafka会在后台线程中异步执行这个压缩过程。

Kafka Producer之事务性

Kafka Producer之事务性

事务性可以防止跨会话幂等性失效,同时也可以保证单个生产者的指定数据,要么全部成功要么全部失败,不限分区。不可以多个生产者共用相同的事务ID。

1. 跨会话幂等性失效

幂等性开启后,broker会对每个分区记录生产者状态,并且生产者具有PID,消息被标记为PID加上序列号,数据重复和有序都是在其基础之上运作的。

生产者重启等因素会导致PID变化,导致幂等性短暂失效。

2. 开启事务

因为事务是基于幂等性的,所以幂等性的配置都要有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package org.dragon.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerTransactionTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//配置acks等级
config.put(ProducerConfig.ACKS_CONFIG, "-1");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.RETRIES_CONFIG, 5);
// 把buffer改小一点,让测试数据组成更多batch
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
// 事务ID
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
//初始化事务
producer.initTransactions();

try {
// 开启事务
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test2",
"" + i,
"我是你爹" + i
);
//发送record
Future<RecordMetadata> send = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("回调信息:消息发送成功");
}
});
System.out.println("发送数据");
send.get();
}
// 提交事务
producer.commitTransaction();
}catch(Exception e) {
// 中止事务
producer.abortTransaction();
e.printStackTrace();
}finally{
//关闭producer
producer.close();
}
}
}

3. 事务流程原理

  1. 查找联系事务管理器

  2. 根据设置的TRANSACTIONAL_ID_CONFIG计算PID,计算方式为哈希值%分区数量

  3. 初始化事务

  4. 将涉及到的分区信息发送给事务管理器,方便事务管理器管理和监控这些分区的事务状态。

  5. 生成数据,发送数据到对应Broker

  6. 对应Broker把分区信息发送给事务管理器,为了确认哪些分区确实已经收到了事务中的消息

  7. 对应Broker返回ACKS

  8. 生产者发起结束事务的请求

  9. 修改事务状态为准备提交

  10. 事务管理器将事务标记为成功或者失败,并通知对应broker。

  11. 修改事务状态为已提交

Kafka Producer之幂等性

Kafka Producer之幂等性

幂等性通过消耗时间和性能的方式,解决乱序和重复问题。

但是只能保证同一生产者在一个分区中的幂等性

1. 启用幂等性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//配置acks等级
config.put(ProducerConfig.ACKS_CONFIG, "-1");
//启用幂等性
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 消息失败重试次数
config.put(ProducerConfig.RETRIES_CONFIG, 5);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

幂等性操作要求:

  1. ACKS = -1

  2. 开启重试机制

  3. 在途请求缓冲区不能大于5

2. 底层变化

消息会被标记,包含生产者ID和消息序列号。

( 如果生产者重启,那么ID会变化,这会使得下图记录无效,幂等性短暂失效。)

并且broker中的ProducerState会记录每个分区的生产者状态,包括最新5个消息的序列号。

3. 数据不重复

消息来到broker分区,经由ProducerState的数据进行对比,

  • 重复则丢弃消息,返回ack。

  • 否则Broker存储消息并返回ack。

4. 数据有序

消息来到broker分区,经由ProducerState的数据进行对比,

  • 如果新消息的序列号是连续的,Broker会接受并存储该消息,然后更新最新序列号。
  • 如果新消息的序列号不连续,Broker会认为这是重复消息或乱序消息,根据配置,它可能会丢弃或拒绝该消息。
  • 无论消息被接受还是丢弃,Broker都会返回一个ack给生产者。

不连续时可能拒绝多个消息,那么这些消息都会返回生产者重新发送,直到按顺序下一个消息到来,才存储并更新。

Kafka Producer之数据重复和乱序问题

Kafka Producer之数据重复和乱序问题

为了可靠性,Kafka有消息重试机制,但是同时也带来了2大问题

1. 数据重复

消息发送到broker后,broker记录消息数据到log中,但是由于网络问题,producer没有收到acks,于是再次发送消息。

除此之外,也可能是其他场景,导致了消息的重复。

2. 数据乱序

如图,消息2、3发送到了broker,但是data1因为网络问题没有到broker,然后被producer重试了,第二次到了,但是顺序乱了。

Kafka Producer之ACKS应答机制

Kafka Producer之ACKS应答机制

1. 应答机制

异步发送的效率高,但是不安全,同步发送安全,但是效率低。

无论哪一种,有一个关键的步骤叫做回调,也就是ACKS应答机制。

其中ACKS也分为3个等级。默认等级是all。

等级 效率 安全
all(-1) 效率低 安全性高
1 效率中 安全性中
0 效率高 安全性低

2. 等级0

  • 生产者发送消息到Kafka集群。
  • 消息进入网络发送队列。
  • 生产者立即返回(认为消息已发送),不等待任何Broker的确认。

3. 等级1

  • 生产者发送消息到Kafka集群。
  • Leader分区接收消息,将消息写入本地日志。
  • Leader分区将消息同步到磁盘(如果配置了日志刷新)。
  • Leader分区返回确认(ACK)给生产者。
  • 生产者收到ACK,继续处理下一条消息。

4. 等级all

  • 生产者发送消息到Kafka集群。
  • Leader分区接收消息,将消息写入本地日志。
  • Leader分区将消息同步到磁盘(如果配置了日志刷新)。
  • Leader分区将消息发送给所有同步副本(ISR)。
  • 每个同步副本(Follower)将消息写入本地日志并返回确认给Leader。
  • Leader分区收到所有同步副本的确认后,返回ACK给生产者。
  • 生产者收到ACK,继续处理下一条消息。

5. 设置等级

1
2
3
4
5
6
7
8
9
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//配置acks等级
config.put(ProducerConfig.ACKS_CONFIG, "-1");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

6. ISR

ISR的定义:

  • 成员:ISR包括Leader和所有与Leader保持同步的Follower分区。保持同步的标准是Follower分区的日志不落后于Leader分区超过指定的时间(由replica.lag.time.max.ms配置)。
  • 目的:确保在Leader发生故障时,可以迅速从ISR中选举一个新的Leader,从而保证分区的高可用性。

ISR的动态调整:

  • Kafka会动态调整ISR的成员。如果一个Follower分区落后于Leader超过一定的时间,Kafka会将其从ISR中移除。
  • 当该Follower分区重新追上Leader并满足同步标准时,Kafka会将其重新加入ISR。
Kafka Producer之消息异步发送和同步发送

Kafka Producer之消息异步发送和同步发送

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class KafkaProducerCallbackTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test2",
""+i,
"我是你爹"+i
);
//发送record
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("回调信息:消息发送成功");
}
});
System.out.println("发送数据");
}

//关闭producer
producer.close();
}
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class KafkaProducerCallbackTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test2",
""+i,
"我是你爹"+i
);
//发送record
Future<RecordMetadata> send = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("回调信息:消息发送成功");
}
});
System.out.println("发送数据");
send.get();
}

//关闭producer
producer.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
Kafka Producer发送消息流程之Sender发送线程和在途请求缓存区

Kafka Producer发送消息流程之Sender发送线程和在途请求缓存区

1. Sender发送数据

Sender线程负责将已经在RecordAccumulator中准备好的消息批次发送到Kafka集群。虽然消息在RecordAccumulator中是按照分区组织的,但Sender线程在发送这些消息时,会按照broker而不是分区来组织发送。这有助于提高发送效率和减少网络开销。

1. 发送数据的详细过程:

  1. 拉取批次Sender线程从RecordAccumulator中拉取已准备好发送的消息批次。这些批次可能来自多个分区。

  2. 按broker组织批次Sender线程会将这些批次按目标broker进行组织,因为一个broker通常负责多个分区的消息处理。这个过程涉及以下步骤:

    • 确定每个分区的leader broker。
    • 将属于同一个broker的所有分区的批次组合在一起。
  3. 发送请求Sender线程会为每个broker创建一个或多个Produce请求(ProduceRequest),然后通过网络将这些请求发送到对应的broker。这些请求包含了该broker负责的所有分区的消息批次。

  4. 处理响应Sender线程会等待broker的响应。响应中包含了每个分区的消息是否成功写入的信息。

    • 如果某个分区的消息写入失败,Sender线程会根据重试机制重试发送这些消息。
    • 如果所有消息都成功写入,Sender线程会从RecordAccumulator中移除这些消息批次。

2. 关键参数配置

以下是一些关键参数,可以影响Sender线程的行为:

  • max.in.flight.requests.per.connection:每个连接允许的最大未完成请求数。默认值为5。如果这个值过大,可能会导致消息重排序。
  • request.timeout.ms:请求超时时间。默认值为30秒。如果broker在此时间内没有响应,Sender线程会重试或失败。
  • retries:重试次数。默认值为0。指定Sender线程在发送消息失败时的重试次数。
  • retry.backoff.ms:重试间隔时间。默认值为100ms。指定每次重试之间的等待时间。

通过这些配置,Kafka生产者可以在不同的网络条件和负载下优化消息发送的效率和可靠性。

在Kafka生产者的Sender线程工作流程中,如果一次任务中包含了来自多个分区的批次,并且这些批次涉及到多个broker,那么Sender线程会分别向这些broker发送请求

2. 在途请求缓存区

  • 存储在途请求:当Sender线程将消息批次发送到broker后,这些请求会存储在在途请求缓存区中,直到收到broker的确认响应。这个缓存区的大小由配置参数max.in.flight.requests.per.connection决定。

  • 重试机制:如果某个请求在指定时间内没有收到响应,生产者会根据配置的重试机制重新发送这些请求。重试机制配置参数包括retriesretry.backoff.ms

  • 顺序保证max.in.flight.requests.per.connection参数设置了每个连接(每个生产者和Kafka的连接)允许的最大未完成请求数。默认值是5。如果这个值设置过大,可能会导致消息重排序问题,特别是在启用了重试机制时。设置合适的值可以平衡并发性能和消息顺序保证。

  • 资源管理:在途请求缓存区的大小会影响生产者的内存使用和性能。如果在途请求过多,可能会占用大量内存资源,导致生产者性能下降。因此,合理设置这个缓存区的大小是优化生产者性能的关键。

Kafka Producer发送消息流程之分区器和数据收集器

Kafka Producer发送消息流程之分区器和数据收集器

1. Partitioner分区器

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

1
int partition = partition(record, serializedKey, serializedValue, cluster);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

private final Partitioner partitioner;

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数
if (record.partition() != null)
return record.partition();

// 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据
if (partitioner != null) {
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
"The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
}
return customPartition;
}


// 如果没有分区器的情况
if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
}
}


// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MyKafkaPartitioner implements Partitioner {
@Override
public int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// Ensure the key is a non-null string
if (key == null || !(key instanceof String)) {
throw new IllegalArgumentException("Key must be a non-null String");
}

// Parse the key as an integer
int keyInt;
try {
keyInt = Integer.parseInt((String) key);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Key must be a numeric string", e);
}

// Determine the partition based on the key's odd/even nature
if (keyInt % 2 == 0) {
return 1; // Even keys go to partition 2
} else {
return 0; // Odd keys go to partition 0
}
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}

新建一个存在多分区的Topic。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class KafkaProducerPartitionorTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//指定拦截器
config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
//指定分区器
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test1",
"key"+i,
"我是你爹"+i
);
//发送record
producer.send(record);
Thread.sleep(500);
}

//关闭producer
producer.close();
}
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。