Kafka Producer之事务性

Kafka Producer之事务性

事务性可以防止跨会话幂等性失效,同时也可以保证单个生产者的指定数据,要么全部成功要么全部失败,不限分区。不可以多个生产者共用相同的事务ID。

1. 跨会话幂等性失效

幂等性开启后,broker会对每个分区记录生产者状态,并且生产者具有PID,消息被标记为PID加上序列号,数据重复和有序都是在其基础之上运作的。

生产者重启等因素会导致PID变化,导致幂等性短暂失效。

2. 开启事务

因为事务是基于幂等性的,所以幂等性的配置都要有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package org.dragon.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerTransactionTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//配置acks等级
config.put(ProducerConfig.ACKS_CONFIG, "-1");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.RETRIES_CONFIG, 5);
// 把buffer改小一点,让测试数据组成更多batch
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
// 事务ID
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
//初始化事务
producer.initTransactions();

try {
// 开启事务
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test2",
"" + i,
"我是你爹" + i
);
//发送record
Future<RecordMetadata> send = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("回调信息:消息发送成功");
}
});
System.out.println("发送数据");
send.get();
}
// 提交事务
producer.commitTransaction();
}catch(Exception e) {
// 中止事务
producer.abortTransaction();
e.printStackTrace();
}finally{
//关闭producer
producer.close();
}
}
}

3. 事务流程原理

  1. 查找联系事务管理器

  2. 根据设置的TRANSACTIONAL_ID_CONFIG计算PID,计算方式为哈希值%分区数量

  3. 初始化事务

  4. 将涉及到的分区信息发送给事务管理器,方便事务管理器管理和监控这些分区的事务状态。

  5. 生成数据,发送数据到对应Broker

  6. 对应Broker把分区信息发送给事务管理器,为了确认哪些分区确实已经收到了事务中的消息

  7. 对应Broker返回ACKS

  8. 生产者发起结束事务的请求

  9. 修改事务状态为准备提交

  10. 事务管理器将事务标记为成功或者失败,并通知对应broker。

  11. 修改事务状态为已提交

Kafka Producer之幂等性

Kafka Producer之幂等性

幂等性通过消耗时间和性能的方式,解决乱序和重复问题。

但是只能保证同一生产者在一个分区中的幂等性

1. 启用幂等性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//配置acks等级
config.put(ProducerConfig.ACKS_CONFIG, "-1");
//启用幂等性
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 消息失败重试次数
config.put(ProducerConfig.RETRIES_CONFIG, 5);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

幂等性操作要求:

  1. ACKS = -1

  2. 开启重试机制

  3. 在途请求缓冲区不能大于5

2. 底层变化

消息会被标记,包含生产者ID和消息序列号。

( 如果生产者重启,那么ID会变化,这会使得下图记录无效,幂等性短暂失效。)

并且broker中的ProducerState会记录每个分区的生产者状态,包括最新5个消息的序列号。

3. 数据不重复

消息来到broker分区,经由ProducerState的数据进行对比,

  • 重复则丢弃消息,返回ack。

  • 否则Broker存储消息并返回ack。

4. 数据有序

消息来到broker分区,经由ProducerState的数据进行对比,

  • 如果新消息的序列号是连续的,Broker会接受并存储该消息,然后更新最新序列号。
  • 如果新消息的序列号不连续,Broker会认为这是重复消息或乱序消息,根据配置,它可能会丢弃或拒绝该消息。
  • 无论消息被接受还是丢弃,Broker都会返回一个ack给生产者。

不连续时可能拒绝多个消息,那么这些消息都会返回生产者重新发送,直到按顺序下一个消息到来,才存储并更新。

Kafka Producer之数据重复和乱序问题

Kafka Producer之数据重复和乱序问题

为了可靠性,Kafka有消息重试机制,但是同时也带来了2大问题

1. 数据重复

消息发送到broker后,broker记录消息数据到log中,但是由于网络问题,producer没有收到acks,于是再次发送消息。

除此之外,也可能是其他场景,导致了消息的重复。

2. 数据乱序

如图,消息2、3发送到了broker,但是data1因为网络问题没有到broker,然后被producer重试了,第二次到了,但是顺序乱了。

Kafka Producer之ACKS应答机制

Kafka Producer之ACKS应答机制

1. 应答机制

异步发送的效率高,但是不安全,同步发送安全,但是效率低。

无论哪一种,有一个关键的步骤叫做回调,也就是ACKS应答机制。

其中ACKS也分为3个等级。默认等级是all。

等级 效率 安全
all(-1) 效率低 安全性高
1 效率中 安全性中
0 效率高 安全性低

2. 等级0

  • 生产者发送消息到Kafka集群。
  • 消息进入网络发送队列。
  • 生产者立即返回(认为消息已发送),不等待任何Broker的确认。

3. 等级1

  • 生产者发送消息到Kafka集群。
  • Leader分区接收消息,将消息写入本地日志。
  • Leader分区将消息同步到磁盘(如果配置了日志刷新)。
  • Leader分区返回确认(ACK)给生产者。
  • 生产者收到ACK,继续处理下一条消息。

4. 等级all

  • 生产者发送消息到Kafka集群。
  • Leader分区接收消息,将消息写入本地日志。
  • Leader分区将消息同步到磁盘(如果配置了日志刷新)。
  • Leader分区将消息发送给所有同步副本(ISR)。
  • 每个同步副本(Follower)将消息写入本地日志并返回确认给Leader。
  • Leader分区收到所有同步副本的确认后,返回ACK给生产者。
  • 生产者收到ACK,继续处理下一条消息。

5. 设置等级

1
2
3
4
5
6
7
8
9
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//配置acks等级
config.put(ProducerConfig.ACKS_CONFIG, "-1");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

6. ISR

ISR的定义:

  • 成员:ISR包括Leader和所有与Leader保持同步的Follower分区。保持同步的标准是Follower分区的日志不落后于Leader分区超过指定的时间(由replica.lag.time.max.ms配置)。
  • 目的:确保在Leader发生故障时,可以迅速从ISR中选举一个新的Leader,从而保证分区的高可用性。

ISR的动态调整:

  • Kafka会动态调整ISR的成员。如果一个Follower分区落后于Leader超过一定的时间,Kafka会将其从ISR中移除。
  • 当该Follower分区重新追上Leader并满足同步标准时,Kafka会将其重新加入ISR。
Kafka Producer之消息异步发送和同步发送

Kafka Producer之消息异步发送和同步发送

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class KafkaProducerCallbackTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test2",
""+i,
"我是你爹"+i
);
//发送record
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("回调信息:消息发送成功");
}
});
System.out.println("发送数据");
}

//关闭producer
producer.close();
}
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class KafkaProducerCallbackTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test2",
""+i,
"我是你爹"+i
);
//发送record
Future<RecordMetadata> send = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("回调信息:消息发送成功");
}
});
System.out.println("发送数据");
send.get();
}

//关闭producer
producer.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
Kafka Producer发送消息流程之Sender发送线程和在途请求缓存区

Kafka Producer发送消息流程之Sender发送线程和在途请求缓存区

1. Sender发送数据

Sender线程负责将已经在RecordAccumulator中准备好的消息批次发送到Kafka集群。虽然消息在RecordAccumulator中是按照分区组织的,但Sender线程在发送这些消息时,会按照broker而不是分区来组织发送。这有助于提高发送效率和减少网络开销。

1. 发送数据的详细过程:

  1. 拉取批次Sender线程从RecordAccumulator中拉取已准备好发送的消息批次。这些批次可能来自多个分区。

  2. 按broker组织批次Sender线程会将这些批次按目标broker进行组织,因为一个broker通常负责多个分区的消息处理。这个过程涉及以下步骤:

    • 确定每个分区的leader broker。
    • 将属于同一个broker的所有分区的批次组合在一起。
  3. 发送请求Sender线程会为每个broker创建一个或多个Produce请求(ProduceRequest),然后通过网络将这些请求发送到对应的broker。这些请求包含了该broker负责的所有分区的消息批次。

  4. 处理响应Sender线程会等待broker的响应。响应中包含了每个分区的消息是否成功写入的信息。

    • 如果某个分区的消息写入失败,Sender线程会根据重试机制重试发送这些消息。
    • 如果所有消息都成功写入,Sender线程会从RecordAccumulator中移除这些消息批次。

2. 关键参数配置

以下是一些关键参数,可以影响Sender线程的行为:

  • max.in.flight.requests.per.connection:每个连接允许的最大未完成请求数。默认值为5。如果这个值过大,可能会导致消息重排序。
  • request.timeout.ms:请求超时时间。默认值为30秒。如果broker在此时间内没有响应,Sender线程会重试或失败。
  • retries:重试次数。默认值为0。指定Sender线程在发送消息失败时的重试次数。
  • retry.backoff.ms:重试间隔时间。默认值为100ms。指定每次重试之间的等待时间。

通过这些配置,Kafka生产者可以在不同的网络条件和负载下优化消息发送的效率和可靠性。

在Kafka生产者的Sender线程工作流程中,如果一次任务中包含了来自多个分区的批次,并且这些批次涉及到多个broker,那么Sender线程会分别向这些broker发送请求

2. 在途请求缓存区

  • 存储在途请求:当Sender线程将消息批次发送到broker后,这些请求会存储在在途请求缓存区中,直到收到broker的确认响应。这个缓存区的大小由配置参数max.in.flight.requests.per.connection决定。

  • 重试机制:如果某个请求在指定时间内没有收到响应,生产者会根据配置的重试机制重新发送这些请求。重试机制配置参数包括retriesretry.backoff.ms

  • 顺序保证max.in.flight.requests.per.connection参数设置了每个连接(每个生产者和Kafka的连接)允许的最大未完成请求数。默认值是5。如果这个值设置过大,可能会导致消息重排序问题,特别是在启用了重试机制时。设置合适的值可以平衡并发性能和消息顺序保证。

  • 资源管理:在途请求缓存区的大小会影响生产者的内存使用和性能。如果在途请求过多,可能会占用大量内存资源,导致生产者性能下降。因此,合理设置这个缓存区的大小是优化生产者性能的关键。

Kafka Producer发送消息流程之分区器和数据收集器

Kafka Producer发送消息流程之分区器和数据收集器

1. Partitioner分区器

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

1
int partition = partition(record, serializedKey, serializedValue, cluster);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

private final Partitioner partitioner;

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数
if (record.partition() != null)
return record.partition();

// 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据
if (partitioner != null) {
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
"The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
}
return customPartition;
}


// 如果没有分区器的情况
if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
}
}


// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MyKafkaPartitioner implements Partitioner {
@Override
public int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// Ensure the key is a non-null string
if (key == null || !(key instanceof String)) {
throw new IllegalArgumentException("Key must be a non-null String");
}

// Parse the key as an integer
int keyInt;
try {
keyInt = Integer.parseInt((String) key);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Key must be a numeric string", e);
}

// Determine the partition based on the key's odd/even nature
if (keyInt % 2 == 0) {
return 1; // Even keys go to partition 2
} else {
return 0; // Odd keys go to partition 0
}
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}

新建一个存在多分区的Topic。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class KafkaProducerPartitionorTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//指定拦截器
config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
//指定分区器
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test1",
"key"+i,
"我是你爹"+i
);
//发送record
producer.send(record);
Thread.sleep(500);
}

//关闭producer
producer.close();
}
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。