1. Partitioner分区器
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,
1 int partition = partition(record, serializedKey, serializedValue, cluster);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private final Partitioner partitioner;private int partition (ProducerRecord<K, V> record, byte [] serializedKey, byte [] serializedValue, Cluster cluster) { if (record.partition() != null ) return record.partition(); if (partitioner != null ) { int customPartition = partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); if (customPartition < 0 ) { throw new IllegalArgumentException (String.format( "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative." , customPartition)); } return customPartition; } if (serializedKey != null && !partitionerIgnoreKeys) { return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size()); } else { return RecordMetadata.UNKNOWN_PARTITION; } } public static int partitionForKey (final byte [] serializedKey, final int numPartitions) { return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; }
2. 自定义分区器 新建类实现Partitioner
接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class MyKafkaPartitioner implements Partitioner { @Override public int partition (String s, Object key, byte [] bytes, Object o1, byte [] bytes1, Cluster cluster) { if (key == null || !(key instanceof String)) { throw new IllegalArgumentException ("Key must be a non-null String" ); } int keyInt; try { keyInt = Integer.parseInt((String) key); } catch (NumberFormatException e) { throw new IllegalArgumentException ("Key must be a numeric string" , e); } if (keyInt % 2 == 0 ) { return 1 ; } else { return 0 ; } } @Override public void close () { } @Override public void configure (Map<String, ?> map) { } }
新建一个存在多分区的Topic。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class KafkaProducerPartitionorTest { public static void main (String[] args) throws InterruptedException { HashMap<String, Object> config = new HashMap <>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092" ); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName()); config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer <String, String>(config); for (int i = 0 ; i < 10 ; i++) { ProducerRecord<String, String> record = new ProducerRecord <String, String>( "test1" , "key" +i, "我是你爹" +i ); producer.send(record); Thread.sleep(500 ); } producer.close(); } }
配置好PARTITIONER_CLASS_CONFIG
后发送消息。
可以分区器成功起作用了。
3. RecordAccumulator数据收集器 通过数据校验后,数据从分区器
来到数据收集器
。
数据收集器的工作机制
队列缓存 :RecordAccumulator
为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size
来调整。
缓冲区管理 :
每个分区都有一个或多个批次,每个批次包含多条消息。
当一个批次填满(即达到batch.size
),或者达到发送条件(如linger.ms
时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender
线程。
满批次处理 :
当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms
的时间条件,RecordAccumulator
会将该批次加入到一个待发送的队列中。
Sender
线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。